您当前的位置:首页 > 电脑百科 > 数据库 > MYSQL

详解 canal 同步 MySQL 增量数据到 ES

时间:2023-08-31 14:54:57  来源:  作者:勇哥java实战分享

canal 是阿里知名的开源项目,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。

这篇文章,我们手把手向同学们展示使用 canal 将 MySQL 增量数据同步到 ES 。

图片

1 集群模式

图片

图中 server 对应一个 canal 运行实例 ,对应一个 JVM 。

server 中包含 1..n 个 instance , 我们可以将 instance 理解为配置任务。

instance 包含如下模块 :

  • eventParser数据源接入,模拟 slave 协议和 master 进行交互,协议解析
  • eventSinkParser 和 Store 链接器,进行数据过滤,加工,分发的工作
  • eventStore数据存储
  • metaManager增量订阅 & 消费信息管理器

真实场景中,canal 高可用依赖 zookeeper ,笔者将客户端模式可以简单划分为:TCP 模式 和 MQ 模式 。

实战中我们经常会使用 MQ 模式 。因为 MQ 模式的优势在于解耦 ,canal server 将数据变更信息发送到消息队列 kafka 或者 RocketMQ ,消费者消费消息,顺序执行相关逻辑即可。

顺序消费:

对于指定的一个 Topic ,所有消息根据 Sharding Key 进行区块分区,同一个分区内的消息按照严格的先进先出(FIFO)原则进行发布和消费。同一分区内的消息保证顺序,不同分区之间的消息顺序不做要求。

图片

2 MySQL配置

1、对于自建 MySQL , 需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf  中配置如下

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

注意:针对阿里云 RDS for MySQL , 默认打开了 binlog , 并且账号默认具有 binlog dump 权限 , 不需要任何权限或者 binlog 设置,可以直接跳过这一步。

2、授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant 。

CREATE USER canal IDENTIFIED BY 'canal';  
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;

3、创建数据库商品表 t_product 。

CREATE TABLE `t_product` (
 `id` BIGINT ( 20 ) NOT NULL AUTO_INCREMENT,
 `name` VARCHAR ( 255 ) COLLATE utf8mb4_bin NOT NULL,
 `price` DECIMAL ( 10, 2 ) NOT NULL,
 `status` TINYINT ( 4 ) NOT NULL,
 `create_time` datetime NOT NULL,
 `update_time` datetime NOT NULL,
   PRIMARY KEY ( `id` ) 
) ENGINE = INNODB DEFAULT CHARSET = utf8mb4 COLLATE = utf8mb4_bin

3 Elasticsearch配置

使用 Kibana 创建商品索引 。

PUT /t_product
{
    "settings": {
        "number_of_shards": 2,
        "number_of_replicas": 1
    },
    "mAppings": {
            "properties": {
               "id": {
                    "type":"keyword"
                },
                "name": {
                    "type":"text"
                },
                "price": {
                    "type":"double"
                },
                "status": {
                    "type":"integer"
                },
                "createTime": {
                    "type": "date",
                    "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
                },
                "updateTime": {
                    "type": "date",
                    "format": "yyyy-MM-dd HH:mm:ss||yyyy-MM-dd||epoch_millis"
                }
        }
    }
}

执行完成,如图所示 :

图片

4 RocketMQ 配置

创建主题:product-syn-topic ,canal 会将 Binlog 的变化数据发送到该主题。

图片图片

5 canal 配置

我们选取 canal 版本 1.1.6 ,进入 conf 目录。

1、配置 canal.properties

#集群模式 zk地址
canal.zkServers = localhost:2181
#本质是MQ模式和tcp模式 tcp, kafka, rocketMQ, rabbitMQ, pulsarMQ
canal.serverMode = rocketMQ
#instance 列表
canal.destinations = product-syn
#conf root dir
canal.conf.dir = ../conf
#全局的spring配置方式的组件文件 生产环境,集群化部署
canal.instance.global.spring.xml = classpath:spring/default-instance.xml

######  以下部分是默认值 展示出来 
# Canal的batch size, 默认50K, 由于kafka最大消息体限制请勿超过1M(900K以下)
canal.mq.canalBatchSize = 50
# Canal get数据的超时时间, 单位: 毫秒, 空为不限超时
canal.mq.canalGetTimeout = 100
# 是否为 flat json格式对象
canal.mq.flatMessage = true

2、instance 配置文件

在 conf 目录下创建实例目录 product-syn  , 在 product-syn 目录创建配置文件 :instance.properties。

#  按需修改成自己的数据库信息
#################################################
...
canal.instance.master.address=192.168.1.20:3306
# username/password,数据库的用户名和密码
...
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
...

# table regex 
canal.instance.filter.regex=mytest.t_product

# mq config
canal.mq.topic=product-syn-topic
# 针对库名或者表名发送动态topic
#canal.mq.dynamicTopic=mytest,.*,mytest.user,mytest\..*,.*\..*
canal.mq.partitinotallow=0
# hash partition config
#canal.mq.partitinotallow=3
#库名.表名: 唯一主键,多个表之间用逗号分隔
#canal.mq.partitinotallow=mytest.person:id,mytest.role:id
#################################################

3、服务启动

启动两个 canal 服务,我们从 zookeeper gui 中查看服务运行情况 。

图片

修改一条 t_product 表记录,可以从 RocketMQ 控制台中观测到新的消息。

图片

6 消费者

1、产品索引操作服务

图片

2、消费监听器

图片

消费者逻辑重点有两点:

  • 顺序消费监听器
  • 将消息数据转换成  JSON 字符串,从 data 节点中获取表最新数据(批量操作可能是多条)。然后根据操作类型 UPDATE、 INSERT、DELETE 执行产品索引操作服务的方法。

7 写到最后

canal 是一个非常有趣的开源项目,很多公司使用 canal 构建数据传输服务( Data Transmission Service ,简称 DTS ) 。

推荐大家阅读这个开源项目,你可以从中学习到网络编程、多线程模型、高性能队列 Disruptor、 流程模型抽象等。

这篇文章涉及到的代码已收录到下面的工程中,有兴趣的同学可以一看。

https://Github.com/makemyownlife/rocketmq4-learning

图片图片



Tags:MySQL   点击:()  评论:()
声明:本站部分内容及图片来自互联网,转载是出于传递更多信息之目的,内容观点仅代表作者本人,不构成投资建议。投资者据此操作,风险自担。如有任何标注错误或版权侵犯请与我们联系,我们将及时更正、删除。
▌相关推荐
MySQL 核心模块揭秘
server 层会创建一个 SAVEPOINT 对象,用于存放 savepoint 信息。binlog 会把 binlog offset 写入 server 层为它分配的一块 8 字节的内存里。 InnoDB 会维护自己的 savepoint...【详细内容】
2024-04-03  Search: MySQL  点击:(5)  评论:(0)  加入收藏
MySQL 核心模块揭秘,你看明白了吗?
为了提升分配 undo 段的效率,事务提交过程中,InnoDB 会缓存一些 undo 段。只要同时满足两个条件,insert undo 段或 update undo 段就能被缓存。1. 关于缓存 undo 段为了提升分...【详细内容】
2024-03-27  Search: MySQL  点击:(10)  评论:(0)  加入收藏
MySQL:BUG导致DDL语句无谓的索引重建
对于5.7.23之前的版本在评估类似DDL操作的时候需要谨慎,可能评估为瞬间操作,但是实际上线的时候跑了很久,这个就容易导致超过维护窗口,甚至更大的故障。一、问题模拟使用5.7.22...【详细内容】
2024-03-26  Search: MySQL  点击:(9)  评论:(0)  加入收藏
从 MySQL 到 ByteHouse,抖音精准推荐存储架构重构解读
ByteHouse是一款OLAP引擎,具备查询效率高的特点,在硬件需求上相对较低,且具有良好的水平扩展性,如果数据量进一步增长,可以通过增加服务器数量来提升处理能力。本文将从兴趣圈层...【详细内容】
2024-03-22  Search: MySQL  点击:(23)  评论:(0)  加入收藏
MySQL自增主键一定是连续的吗?
测试环境:MySQL版本:8.0数据库表:T (主键id,唯一索引c,普通字段d)如果你的业务设计依赖于自增主键的连续性,这个设计假设自增主键是连续的。但实际上,这样的假设是错的,因为自增主键不...【详细内容】
2024-03-10  Search: MySQL  点击:(5)  评论:(0)  加入收藏
准线上事故之MySQL优化器索引选错
1 背景最近组里来了许多新的小伙伴,大家在一起聊聊技术,有小兄弟提到了MySQL的优化器的内部策略,想起了之前在公司出现的一个线上问题,今天借着这个机会,在这里分享下过程和结论...【详细内容】
2024-03-07  Search: MySQL  点击:(26)  评论:(0)  加入收藏
MySQL数据恢复,你会吗?
今天分享一下binlog2sql,它是一款比较常用的数据恢复工具,可以通过它从MySQL binlog解析出你要的SQL,并根据不同选项,可以得到原始SQL、回滚SQL、去除主键的INSERT SQL等。主要...【详细内容】
2024-02-22  Search: MySQL  点击:(43)  评论:(0)  加入收藏
如何在MySQL中实现数据的版本管理和回滚操作?
实现数据的版本管理和回滚操作在MySQL中可以通过以下几种方式实现,包括使用事务、备份恢复、日志和版本控制工具等。下面将详细介绍这些方法。1.使用事务:MySQL支持事务操作,可...【详细内容】
2024-02-20  Search: MySQL  点击:(51)  评论:(0)  加入收藏
为什么高性能场景选用Postgres SQL 而不是 MySQL
一、 数据库简介 TLDR;1.1 MySQL MySQL声称自己是最流行的开源数据库,它属于最流行的RDBMS (Relational Database Management System,关系数据库管理系统)应用软件之一。LAMP...【详细内容】
2024-02-19  Search: MySQL  点击:(37)  评论:(0)  加入收藏
MySQL数据库如何生成分组排序的序号
经常进行数据分析的小伙伴经常会需要生成序号或进行数据分组排序并生成序号。在MySQL8.0中可以使用窗口函数来实现,可以参考历史文章有了这些函数,统计分析事半功倍进行了解。...【详细内容】
2024-01-30  Search: MySQL  点击:(53)  评论:(0)  加入收藏
▌简易百科推荐
MySQL 核心模块揭秘
server 层会创建一个 SAVEPOINT 对象,用于存放 savepoint 信息。binlog 会把 binlog offset 写入 server 层为它分配的一块 8 字节的内存里。 InnoDB 会维护自己的 savepoint...【详细内容】
2024-04-03  爱可生开源社区    Tags:MySQL   点击:(5)  评论:(0)  加入收藏
MySQL 核心模块揭秘,你看明白了吗?
为了提升分配 undo 段的效率,事务提交过程中,InnoDB 会缓存一些 undo 段。只要同时满足两个条件,insert undo 段或 update undo 段就能被缓存。1. 关于缓存 undo 段为了提升分...【详细内容】
2024-03-27  爱可生开源社区  微信公众号  Tags:MySQL   点击:(10)  评论:(0)  加入收藏
MySQL:BUG导致DDL语句无谓的索引重建
对于5.7.23之前的版本在评估类似DDL操作的时候需要谨慎,可能评估为瞬间操作,但是实际上线的时候跑了很久,这个就容易导致超过维护窗口,甚至更大的故障。一、问题模拟使用5.7.22...【详细内容】
2024-03-26  MySQL学习  微信公众号  Tags:MySQL   点击:(9)  评论:(0)  加入收藏
从 MySQL 到 ByteHouse,抖音精准推荐存储架构重构解读
ByteHouse是一款OLAP引擎,具备查询效率高的特点,在硬件需求上相对较低,且具有良好的水平扩展性,如果数据量进一步增长,可以通过增加服务器数量来提升处理能力。本文将从兴趣圈层...【详细内容】
2024-03-22  字节跳动技术团队    Tags:ByteHouse   点击:(23)  评论:(0)  加入收藏
MySQL自增主键一定是连续的吗?
测试环境:MySQL版本:8.0数据库表:T (主键id,唯一索引c,普通字段d)如果你的业务设计依赖于自增主键的连续性,这个设计假设自增主键是连续的。但实际上,这样的假设是错的,因为自增主键不...【详细内容】
2024-03-10    dbaplus社群  Tags:MySQL   点击:(5)  评论:(0)  加入收藏
准线上事故之MySQL优化器索引选错
1 背景最近组里来了许多新的小伙伴,大家在一起聊聊技术,有小兄弟提到了MySQL的优化器的内部策略,想起了之前在公司出现的一个线上问题,今天借着这个机会,在这里分享下过程和结论...【详细内容】
2024-03-07  转转技术  微信公众号  Tags:MySQL   点击:(26)  评论:(0)  加入收藏
MySQL数据恢复,你会吗?
今天分享一下binlog2sql,它是一款比较常用的数据恢复工具,可以通过它从MySQL binlog解析出你要的SQL,并根据不同选项,可以得到原始SQL、回滚SQL、去除主键的INSERT SQL等。主要...【详细内容】
2024-02-22  数据库干货铺  微信公众号  Tags:MySQL   点击:(43)  评论:(0)  加入收藏
如何在MySQL中实现数据的版本管理和回滚操作?
实现数据的版本管理和回滚操作在MySQL中可以通过以下几种方式实现,包括使用事务、备份恢复、日志和版本控制工具等。下面将详细介绍这些方法。1.使用事务:MySQL支持事务操作,可...【详细内容】
2024-02-20  编程技术汇    Tags:MySQL   点击:(51)  评论:(0)  加入收藏
MySQL数据库如何生成分组排序的序号
经常进行数据分析的小伙伴经常会需要生成序号或进行数据分组排序并生成序号。在MySQL8.0中可以使用窗口函数来实现,可以参考历史文章有了这些函数,统计分析事半功倍进行了解。...【详细内容】
2024-01-30  数据库干货铺  微信公众号  Tags:MySQL   点击:(53)  评论:(0)  加入收藏
mysql索引失效的场景
MySQL中索引失效是指数据库查询时无法有效利用索引,这可能导致查询性能显著下降。以下是一些常见的MySQL索引失效的场景:1.使用非前导列进行查询: 假设有一个复合索引 (A, B)。...【详细内容】
2024-01-15  小王爱编程  今日头条  Tags:mysql索引   点击:(82)  评论:(0)  加入收藏
站内最新
站内热门
站内头条