基于Canal的MySQL=>ES数据同步方案

本文涉及的产品
RDS MySQL Serverless 基础系列,0.5-2RCU 50GB
日志服务 SLS,月写入数据量 50GB 1个月
云数据库 RDS PostgreSQL,高可用系列 2核4GB
简介: 基于Canal的MySQL=>ES数据同步方案

1、MySQL和ES的主要区别?

1.1 功能性

MySQL作为最常用的DB之一,在DB-Ranking排名常年保持前三,仅次于Oracle。为什么还需要把数据同步至排名只有第八的Elasticsearch?相信不了解这两个数据库的区别的同学很容易有这样的疑问。下面我来解答这个问题。

541842ada18448ef97eaf0666e79d0d4.png


首先两款数据库在功能上的区别:

MySQL Elasticsearch
关系型 搜索引擎
单体 分布式
OLTP OLAP
支持事务 不支持事务
SQL DSL


1.2 性能指标

再来看一下两种数据库的性能对比

MySQL Elasticsearch
检索性能 ★★★ ★★★★★
扩展能力 ★★ ★★★★★
写入实时性 ★★★★ ★★(可配置)
准确性 ★★★★ ★★
支持的数据类型 ★★★ ★★★★
灵活性 ★★★ ★★★★
事务支持 ★★★ ★★★★

可以看到,Elasticsearch在检索和扩展能力方面碾压MySQL。实际上MySQL在数据量达到 2-3千万的时候查询性能开始显著下降,而ES的检索性能,实测在数据量1亿左右,单节点1G内存,无任何优化的前提下仍能达到10ms以内的检索性能。并且ES具有MySQL无法相比的横向扩展能力,具备极高的可用性和扩展性。


1.3 在搜索业务上的区别

同样是检索结果,ES中的“检索”和关系型数据库的“查询”是两个不同的概念。搜索引擎的概念里,“检索”和相关度紧密耦合。


1.3.1 查询

关系数据库中“查询”的概念具有“完全相关性”,即搜索结果要么完全满足要求,要么不满足,而不存在“部分满足”的概念。


1.3.2 检索

而以ES为代表的搜索引擎中的“检索”(泛指全文检索),具备相关度的概念,即和搜索词预期搜索结果相符的程度。影响相关度的因素有很多,搜索量,数据本身等都可能影响最终结果。ES以相关度评分来衡量相关度的结果,这里不什么是相关度评分,如果感兴趣,可以异步“相关度评分”一文。


举个简单的例子:当用户搜索词为“小米手机”的时候,“查询”的执行逻辑为field_name='小米手机'或者field_name like '%小米手机%',即搜索结果的匹配逻辑为完全匹配或包含关系。而如果是检索,假设分词结果为“小米”和“手机”两个词项,搜索结果可能只包含“小米”或者只“手机”,如果设置了同义词等逻辑,匹配出手表、华为也是符合结果的。即搜索结果可能完全不包含搜索词。


2、为什么要做数据同步

MySQL是关系型数据库,而Elasticsearch是搜索引擎的核心组件之一,属于非关系型数据库。有一句经典名言叫:存在即合理,两款数据库既然同时存在,那么他们必然有对方不可取代的地方。

这里只讨论两者最主要的不可替代特性,至于扩展能力、数据类型等问题本文暂不讨论


2.1 检索性能

ES擅长海量数据的检索,支持PB级数据的秒级查询,以亿为单位的数据在ES看来只是起步而已,并且搜索结果具有非完全相关性,而是全文检索。其相关度有专门的算法来计算,主要是TF-IDF和BM25,两种算法这里不赘述,我的文章里有详细介绍。而MySQL虽然也有“全文索引”,但是非常积累,不管存存储效率还是检索效率都远不如ES,这也是其底层的数据结构和算法导致。普通索引目前底层数据结构为B+Trees,这种数据结构不适合存储长文本类型的索引,会导致树的深度过大,检索效率极低。其底层原理我有单独的文章详细介绍(待更新)。


2.2 写入性能

ES是OLAP系统,侧重于海量数据的检索,而写入实时性并不是很高,默认1秒,也就是ES缓冲区Buffer的刷新间隔时间,不了解Elasticsearch写入原理的同学可以暂时忽略。ES并非忽略了对写入性能的优化,而是“有意为之”,其原因就在于基于ES的写入机制,其写入实时性和大数据检索性能是一个二选一的行为。实际上生产环境中我们经常通过“牺牲写入实时性”的操作来换取更高更快的“数据检索”性能。


2.3 事务支持

正因为ES的写入实时性并不高,如果我们需要快速响应用户请求,我们常采取的手段就是使用缓存,但是在很多高并发的场景下,我们需要数据保持强一致性(如银行系统),因此需要使用具有ACID特性的数据库来支持,而MySQL就是一个比较好的选择。


但是如果单单只用MySQL,又无法解决海量数据的检索问题。其实在实际工作中常见的做法就是两者结合起来,通过数据同步的操作将MySQL的实时数据同步至Elasticsearch,两者各司其职互不干扰。


3、Canal

3.1 Canal是啥

canal [kə’næl],译意为水道/管道/沟渠,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费

早期阿里巴巴因为杭州和美国双机房部署,存在跨机房同步的业务需求,实现方式主要是基于业务 trigger 获取增量变更。从 2010 年开始,业务逐步尝试数据库日志解析获取增量变更进行同步,由此衍生出了大量的数据库增量订阅和消费业务。


3.2 应用场景

Elasticsearch 不支持事务。 ES通常在分布式系统架构中承担“搜索引擎”的角色,一般来说解决词类问题,可以把ES和支持ACID特性的关系型数据库结合起来使用。

首先把对数据的更(增删改)操作在RDB中执行,然后把这些动作同步到Elasticsearch。 通过这种方式,你将受益于数据库 ACID 事务支持,并且在 Elasticsearch 中以正确的顺序产生变更。 并发在关系数据库中得到了处理。

MySQL为例,如果要把数据从同步至ES,canal + binary log就是常用的一种增量解决方案。


3.3 原理机制

7ba90dbf94c6a940b6a474309c83ab25.jpg


3.3.1 MySQL主备复制原理

  • MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
  • MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
  • MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据


3.3.2 canal 工作原理

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  • canal 解析 binary log 对象(原始为 byte 流)

7138b15848a0e26bd84862cc01d0b03c.png


3.3 优势

  • 准实时性
  • 性能好
  • 一劳永逸


4 基于canal的MySQL=>ES数据同步方案

4.1 环境:

Java和ES兼容性:https://www.elastic.co/cn/support/matrix#matrix_jvm

  • JDK:1.8
  • Elasticsearch:7.x
  • MySQL: 5.7
  • Canal: 1.1.4


4.2 下载地址:


4.3 步骤

4.3.1 保证Elasticsearch服务可用

使用cancal向ES同步数据之前需要保证ES的服务是可以正常访问的


4.3.2 保证MySQL服务可用

不再赘述


4.3.3 开启MySQL的binary log(主备模式)

配置MySQL:

server_id = 1 #开启主从模式后每个MySQL节点的id
log-bin = mysql-bin #bin-log的存储位置
binlog-format = ROW #选择存储binlog日志方式为ROW模式


重启MySQL服务

验证是否开启成功

SHOW VARIABLES LIKE 'log_bin';
log_bin ON #开启


4.4.4 canal-deployer

配置:conf/example/instance.properties

#canal示例的slaveId
canal.instance.mysql.slaveId=1234
#mysql地址
canal.instance.master.address= 127.0.0.1:3306
#用户名
canal.instance.dbUsername = root
#密码
canal.instance.dbPassword = 123456
#指定需要同步的数据库
canal.instance.defaultDatabaseName = msb_order
#指定编码方式
canal.instance.connectionCharset = UTF-8
#监控的是所有数据库,所有的表改动都会监控到,这样可能会浪费不少性能,可能我只想监控的是某一个数据库下的表。
#  .*\\..*表示监控所有数据库,canal\\..*表示监控canal数据库
canal.instance.filter.regex = .\*\\\\..\*


启动: ./startup.sh(Linux)

验证: demo


4.4.5 canal-admin

配置:conf/application.yml

server:
  port: 8089
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
spring.datasource:
  address: 127.0.0.1:3306
  database: canal_manager
  username: root
  password: 123456
  driver-class-name: com.mysql.jdbc.Driver
  url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false
  hikari:
    maximum-pool-size: 30
    minimum-idle: 1
canal:
  adminUser: admin
  adminPasswd: admin

启动管理服务

访问服务:server_ip:8089


4.4.6 canal-adapter

配置:conf/application.yml

server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null
canal.conf:
  mode: tcp #tcp kafka rocketMQ rabbitMQ
  flatMessage: true
  zookeeperHosts:
  syncBatchSize: 1000
  retries: 0
  timeout:
  accessKey:
  secretKey:
  consumerProperties:
    # canal tcp consumer
    canal.tcp.server.host: 127.0.0.1:11111
    canal.tcp.zookeeper.hosts:
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:
    # kafka consumer
    kafka.bootstrap.servers: 127.0.0.1:9092
    kafka.enable.auto.commit: false
    kafka.auto.commit.interval.ms: 1000
    kafka.auto.offset.reset: latest
    kafka.request.timeout.ms: 40000
    kafka.session.timeout.ms: 30000
    kafka.isolation.level: read_committed
    kafka.max.poll.records: 1000
    # rocketMQ consumer
    rocketmq.namespace:
    rocketmq.namesrv.addr: 127.0.0.1:9876
    rocketmq.batch.size: 1000
    rocketmq.enable.message.trace: false
    rocketmq.customized.trace.topic:
    rocketmq.access.channel:
    rocketmq.subscribe.filter:
    # rabbitMQ consumer
    rabbitmq.host:
    rabbitmq.virtual.host:
    rabbitmq.username:
    rabbitmq.password:
    rabbitmq.resource.ownerId:
  srcDataSources:
    defaultDS:
      url: jdbc:mysql://127.0.0.1:3306/msb_order?useUnicode=true
      username: root
      password: 123456
  canalAdapters:
  - instance: example # canal instance Name or mq topic name
    groups:
    - groupId: g1
      outerAdapters:
      - name: logger
      - name: es7
        hosts: 127.0.0.1:9200 # 127.0.0.1:9200 for rest mode
        key: exampleKey
        properties:
          mode: rest # transport or rest
          # security.auth: test:123456 #  only used for rest mode
          cluster.name: elasticsearch


配置:conf/es7/my_order.yml

这里要注意,在写SQL语句的时候,要保证a.id as _id,即把id属性改为_id,以保证能正常写入ES的_id字段。

dataSourceKey: defaultDS        # 源数据源的key, 对应上面配置的srcDataSources中的值
outerAdapterKey: exampleKey     # 对应application.yml中es配置的key 
destination: example            # cannal的instance或者MQ的topic
groupId: g1                     # 对应MQ模式下的groupId, 只会同步对应groupId的数据_search
esMapping:
  _index: msb_order             # es 的索引名称
  _type: _doc         # es7 固定'_doc',es8删除
  _id: _id                      # es 的_id, 如果不配置该项必须配置下面的pk项_id则会由es自动分配
  upsert: true
  sql: "SELECT a.id as _id, a.customer_phone, a.customer_name, a.customer_region, a.customer_city, a.customer_district, a.customer_addr, a.failure_phenomenon, a.failure_phenomenon_text, a.book_date, a.service_required, a.buyshop_detail, a.buy_shop, a.buy_way, a.product_type, a.model_no, a.process_type, a.ex_order_no, a.customer_country, a.source, a.matnr, a.buyDate, a.serviceReq, a.create_time, a.result, a.rvstatus, a.ip_address FROM msb_order_1 a"
  commitBatch: 3000  


服务启动

注意:

  • 索引msb_order的mapping必须提前创建好
  • 索引中的id字段是_id,因此需要查询的时候需要id as _id
相关实践学习
每个IT人都想学的“Web应用上云经典架构”实战
本实验从Web应用上云这个最基本的、最普遍的需求出发,帮助IT从业者们通过“阿里云Web应用上云解决方案”,了解一个企业级Web应用上云的常见架构,了解如何构建一个高可用、可扩展的企业级应用架构。
MySQL数据库入门学习
本课程通过最流行的开源数据库MySQL带你了解数据库的世界。   相关的阿里云产品:云数据库RDS MySQL 版 阿里云关系型数据库RDS(Relational Database Service)是一种稳定可靠、可弹性伸缩的在线数据库服务,提供容灾、备份、恢复、迁移等方面的全套解决方案,彻底解决数据库运维的烦恼。 了解产品详情: https://www.aliyun.com/product/rds/mysql 
相关文章
|
2月前
|
关系型数据库 MySQL Java
MySQL 分库分表 + 平滑扩容方案 (秒懂+史上最全)
MySQL 分库分表 + 平滑扩容方案 (秒懂+史上最全)
|
9月前
|
存储 缓存 关系型数据库
【MySQL进阶篇】存储引擎(MySQL体系结构、InnoDB、MyISAM、Memory区别及特点、存储引擎的选择方案)
MySQL的存储引擎是其核心组件之一,负责数据的存储、索引和检索。不同的存储引擎具有不同的功能和特性,可以根据业务需求 选择合适的引擎。本文详细介绍了MySQL体系结构、InnoDB、MyISAM、Memory区别及特点、存储引擎的选择方案。
1548 57
【MySQL进阶篇】存储引擎(MySQL体系结构、InnoDB、MyISAM、Memory区别及特点、存储引擎的选择方案)
|
6月前
|
canal 关系型数据库 MySQL
Canal是怎么伪装成 MySQL slave?
Canal是怎么伪装成 MySQL slave?
|
6月前
|
消息中间件 缓存 NoSQL
缓存与数据库的一致性方案,Redis与Mysql一致性方案,大厂P8的终极方案(图解+秒懂+史上最全)
缓存与数据库的一致性方案,Redis与Mysql一致性方案,大厂P8的终极方案(图解+秒懂+史上最全)
|
7月前
|
SQL 关系型数据库 MySQL
基于SQL Server / MySQL进行百万条数据过滤优化方案
对百万级别数据进行高效过滤查询,需要综合使用索引、查询优化、表分区、统计信息和视图等技术手段。通过合理的数据库设计和查询优化,可以显著提升查询性能,确保系统的高效稳定运行。
236 9
|
8月前
|
监控 关系型数据库 MySQL
Aurora MySQL负载突增应对策略与优化方案
通过以上策略,企业可以有效应对 Aurora MySQL 的负载突增,确保数据库在高负载情况下依然保持高性能和稳定性。这些优化方案涵盖了从架构设计到具体配置和监控的各个方面,能够全面提升数据库的响应速度和处理能力。在实际应用中,应根据具体的业务需求和负载特征,灵活调整和应用这些优化策略。
146 22
|
8月前
|
Java 关系型数据库 MySQL
MySQL 分库分表方案
本文总结了数据库分库分表的相关概念和实践,针对单张表数据量过大及增长迅速的问题,介绍了垂直和水平切分的方式及其适用场景。文章分析了分库分表后可能面临的事务支持、多库结果集合并、跨库join等问题,并列举了几种常见的开源分库分表中间件。最后强调了不建议水平分库分表的原因,帮助读者在规划时规避潜在问题。
934 20
|
SQL DataWorks 关系型数据库
DataWorks操作报错合集之如何处理数据同步时(mysql->hive)报:Render instance failed
DataWorks是阿里云提供的一站式大数据开发与治理平台,支持数据集成、数据开发、数据服务、数据质量管理、数据安全管理等全流程数据处理。在使用DataWorks过程中,可能会遇到各种操作报错。以下是一些常见的报错情况及其可能的原因和解决方法。
197 0
|
11月前
|
监控 关系型数据库 MySQL
深入了解MySQL主从复制:构建高效稳定的数据同步架构
深入了解MySQL主从复制:构建高效稳定的数据同步架构
305 1
|
canal 消息中间件 关系型数据库
Canal作为一款高效、可靠的数据同步工具,凭借其基于MySQL binlog的增量同步机制,在数据同步领域展现了强大的应用价值
【9月更文挑战第1天】Canal作为一款高效、可靠的数据同步工具,凭借其基于MySQL binlog的增量同步机制,在数据同步领域展现了强大的应用价值
1545 4

推荐镜像

更多