您当前的位置:首页 > 电脑百科 > 数据库 > 百科

canal+Kafka实现mysql与redis数据同步

时间:2022-01-04 10:02:58  来源:  作者:仰望四十五度的光

前言

上篇文章简单介绍canal概念,本文结合常见的缓存业务去讲解canal使用。在实际开发过程中,通常都会把数据往redis缓存中保存一份,做下简单的查询优化。如果这时候数据库数据发生变更操作,就不得不在业务代码中写一段同步更新redis的代码,但是这种 数据同步的代码和业务代码糅合在一起 看起来不是很优雅,而且还会出现数据不一致问题。那能不能把这部分同步代码从中抽离出来,形成独立模块呢?答案是肯定的,下面通过canal结合Kafka来实现MySQL与redis之间的数据同步。

架构设计

canal是一个伪装成slave订阅mysql的binlog,实现数据同步的中间件。上一篇文章 canal入门 中简单介绍了使用方式,即tcp模式;其实canal也是支持直接发送到MQ中,比如:Kafka、RocketMQ、RabbitMQ。本文采用Kafka讲解,实现mysql与redis之间的数据同步。

通过上述结构设计图可以很清晰的知道用到的组件:MySQL、Canal、Kafka、ZooKeeper、Redis。

canal+Kafka实现mysql与redis数据同步

 

Kafka&Zookeeper搭建

首先在官网下载Kafka:

canal+Kafka实现mysql与redis数据同步

 

下载后解压文件夹,可以看到以下几个文件:

canal+Kafka实现mysql与redis数据同步

 

Kafka内部自带了zookeeper,所以暂不需要去下载搭建zookeeper集群,本文就使用Kafka自带zookeeper来实现。

canal+Kafka实现mysql与redis数据同步

 

通过上述zookeeper启动命令以及Kafka启动命令把服务启动,可以通过以下简单实现下是否成功:

# 命令常见一个canaltopic 队列
kafka-topics --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic canaltopic

Canal搭建

canal搭建具体可以参考上文,这里只讲解具体的参数配置:

找到/conf目录下的canal.properties配置文件:

# tcp, kafka, RocketMQ 这里选择kafka模式
canal.serverMode = kafka
# 解析器的线程数,打开此配置,不打开则会出现阻塞或者不进行解析的情况
canal.instance.parser.parallelThreadSize = 16
# 配置MQ的服务地址,这里配置的是kafka对应的地址和端口
canal.mq.servers = 127.0.0.1:9092
# 配置instance,在conf目录下要有example同名的目录,可以配置多个
canal.destinations = example

然后配置instance,找到
/conf/example/instance.properties配置文件:

## mysql serverId , v1.0.26+ will autoGen(自动生成,不需配置)
# canal.instance.mysql.slaveId=0


# position info
canal.instance.master.address=127.0.0.1:3306
# 在Mysql执行 SHOW MASTER STATUS;查看当前数据库的binlog
canal.instance.master.journal.name=mysql-bin.000006
canal.instance.master.position=4596
# 账号密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=Canal@****
canal.instance.connectionCharset = UTF-8
#MQ队列名称
canal.mq.topic=canaltopic
#单队列模式的分区下标
canal.mq.partition=0

经过上述配置后,就可以启动canal了。

测试

环境搭建完成后,就可以编写代码进行测试。

1、引入pom依赖

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

2、封装Redis工具类

Application.yml文件增加以下配置:

spring:  
  redis:
    host: 127.0.0.1
    port: 6379
    database: 0
    password: 123456

封装一个操作Redis的工具类:

@Component
public class RedisClient {


    /**
     * 获取redis模版
     */
    @Resource
    private StringRedisTemplate stringRedisTemplate;


    /**
     * 设置redis的key-value
     */
    public void setString(String key, String value) {
        setString(key, value, null);
    }


    /**
     * 设置redis的key-value,带过期时间
     */
    public void setString(String key, String value, Long timeOut) {
        stringRedisTemplate.opsForValue().set(key, value);
        if (timeOut != null) {
            stringRedisTemplate.expire(key, timeOut, TimeUnit.SECONDS);
        }
    }


    /**
     * 获取redis中key对应的值
     */
    public String getString(String key) {
        return stringRedisTemplate.opsForValue().get(key);
    }


    /**
     * 删除redis中key对应的值
     */
    public Boolean deleteKey(String key) {
        return stringRedisTemplate.delete(key);
    }
}

3、创建MQ消费者进行同步

在application.yml配置文件加上kafka的配置信息:
spring:
  kafka:
      # Kafka服务地址
    bootstrap-servers: 127.0.0.1:9092
    consumer:
      # 指定一个默认的组名
      group-id: consumer-group1
      #序列化反序列化
      key-deserializer: org.Apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      key-serializer: org.apache.kafka.common.serialization.StringDeserializer
      value-serializer: org.apache.kafka.common.serialization.StringDeserializer
      # 批量抓取
      batch-size: 65536
      # 缓存容量
      buffer-memory: 524288

创建一个CanalBean对象进行接收:

public class CanalBean {
    //数据
    private List<TbCommodityInfo> data;
    //数据库名称
    private String database;
    private long es;
    //递增,从1开始
    private int id;
    //是否是DDL语句
    private boolean isDdl;
    //表结构的字段类型
    private MysqlType mysqlType;
    //UPDATE语句,旧数据
    private String old;
    //主键名称
    private List<String> pkNames;
    //sql语句
    private String sql;
    private SqlType sqlType;
    //表名
    private String table;
    private long ts;
    //(新增)INSERT、(更新)UPDATE、(删除)DELETE、(删除表)ERASE等等
    private String type;
    //getter、setter方法
}


public class MysqlType {
    private String id;
    private String commodity_name;
    private String commodity_price;
    private String number;
    private String description;
    //getter、setter方法
}


public class SqlType {
    private int id;
    private int commodity_name;
    private int commodity_price;
    private int number;
    private int description;
}

最后就可以创建一个消费者CanalConsumer进行消费:

@Slf4j
@Component
public class CanalConsumer {


    @Resource
    private RedisClient redisClient;


    @KafkaListener(topics = "canaltopic")
    public void receive(ConsumerRecord<?, ?> consumer) {
        String value = (String) consumer.value();
        log.info("topic名称:{},key:{},分区位置:{},下标:{},value:{}", consumer.topic(), consumer.key(),
                consumer.partition(), consumer.offset(), value);
        //转换为JAVABean
        CanalBean canalBean = JSONObject.parseobject(value, CanalBean.class);
        //获取是否是DDL语句
        boolean isDdl = canalBean.hasDdl();
        //获取类型
        String type = canalBean.getType();
        //不是DDL语句
        if (!isDdl) {
            List<TbCommodityInfo> tbCommodityInfos = canalBean.getData();
            //过期时间
            long TIME_OUT = 600L;
            if ("INSERT".equals(type)) {
                //新增语句
                for (TbCommodityInfo tbCommodityInfo : tbCommodityInfos) {
                    String id = tbCommodityInfo.getId();
                    log.info("新增数据到redis, id: {}, data: {}", id, JSONObject.toJSONString(tbCommodityInfo));
                    //新增到redis中,过期时间是10分钟
                    redisClient.setString(id, JSONObject.toJSONString(tbCommodityInfo), TIME_OUT);
                    log.info("从redis获取数据 result: {}", JSONObject.toJSONString(redisClient.getString(id)));
                }
            } else if ("UPDATE".equals(type)) {
                //更新语句
                for (TbCommodityInfo tbCommodityInfo : tbCommodityInfos) {
                    String id = tbCommodityInfo.getId();
                    log.info("修改数据到redis, id: {}, data: {}", id, JSONObject.toJSONString(tbCommodityInfo));
                    //更新到redis中,过期时间是10分钟
                    redisClient.setString(id, JSONObject.toJSONString(tbCommodityInfo), TIME_OUT);
                }
            } else {
                //删除语句
                for (TbCommodityInfo tbCommodityInfo : tbCommodityInfos) {
                    String id = tbCommodityInfo.getId();
                    log.info("删除数据从redis, id: {}", id);
                    //从redis中删除
                    redisClient.deleteKey(id);
                }
            }
        }
    }


}

测试Mysql与Redis同步

mysql对应的表结构如下:

CREATE TABLE `tb_commodity_info` (
  `id` varchar(32) NOT NULL,
  `commodity_name` varchar(512) DEFAULT NULL COMMENT '商品名称',
  `commodity_price` varchar(36) DEFAULT '0' COMMENT '商品价格',
  `number` int(10) DEFAULT '0' COMMENT '商品数量',
  `description` varchar(2048) DEFAULT '' COMMENT '商品描述',
  PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='商品信息表';

启动项目后,新增一条数据:

INSERT INTO `canaldb`.`tb_commodity_info` (`id`, `commodity_name`, `commodity_price`, `number`, `description`) VALUES ('3e71a81fd80711eaaed600163e046cc3', '叉烧包', '3.99', '3', '又大又香的叉烧包,老人小孩都喜欢');

可以在控制台看到以下输出:

2022-01-02 18:12:51.317  INFO 55608 --- [ntainer#0-0-C-1] c.kingoe.canaldj.mqcanal.CanalConsumer   : 新增数据到redis, id: 3e71a81fd80711eaaed600163e046cc3, data: {"commodity_name":"叉烧包","commodity_price":"3.99","description":"又大又香的叉烧包,老人小孩都喜欢","id":"3e71a81fd80711eaaed600163e046cc3","number":"3"}
2022-01-02 18:12:51.320  INFO 55608 --- [ntainer#0-0-C-1] c.kingoe.canaldj.mqcanal.CanalConsumer   : 从redis获取数据 result: "{"commodity_name":"叉烧包","commodity_price":"3.99","description":"又大又香的叉烧包,老人小孩都喜欢","id":"3e71a81fd80711eaaed600163e046cc3","number":"3"}"

如果更新呢?试一下Update语句:

UPDATE `canaldb`.`tb_commodity_info` SET `commodity_name`='青菜包',`description`='很便宜的青菜包呀,不买也开看看了喂' WHERE `id`='3e71a81fd80711eaaed600163e046cc3';

同样可以在控制台看到以下输出:

2022-01-02 18:14:44.613  INFO 55608 --- [ntainer#0-0-C-1] c.kingoe.canaldj.mqcanal.CanalConsumer   : topic名称:canaltopic,key:null,分区位置:0,下标:6,value:{"data":[{"id":"3e71a81fd80711eaaed600163e046cc3","commodity_name":"青菜包","commodity_price":"3.99","number":"3","description":"很便宜的青菜包呀,不买也开看看了喂"}],"database":"study","es":1641118484000,"id":7,"isDdl":false,"mysqlType":{"id":"varchar(32)","commodity_name":"varchar(512)","commodity_price":"varchar(36)","number":"int(10)","description":"varchar(2048)"},"old":[{"commodity_name":"叉烧包","description":"又大又香的叉烧包,老人小孩都喜欢"}],"pkNames":["id"],"sql":"","sqlType":{"id":12,"commodity_name":12,"commodity_price":12,"number":4,"description":12},"table":"tb_commodity_info","ts":1641118484602,"type":"UPDATE"}
2022-01-02 18:14:44.616  INFO 55608 --- [ntainer#0-0-C-1] c.kingoe.canaldj.mqcanal.CanalConsumer   : 修改数据到redis, id: 3e71a81fd80711eaaed600163e046cc3, data: {"commodity_name":"青菜包","commodity_price":"3.99","description":"很便宜的青菜包呀,不买也开看看了喂","id":"3e71a81fd80711eaaed600163e046cc3","number":"3"}

经过测试完全么有问题。

 

总结

既然canal这么强大,难道就没缺点嘛?答案当然是存在的啦,比如:canal只能同步增量数据、不是实时同步而是准实时同步、MQ顺序问题等;尽管有一些缺点,毕竟没有一样技术或者产品是完美的,最重要是合适。比如公司目前有个视图服务提供宽表搜索查询功能就是通过 同步Mysql数据到Es采用Canal+Kafka的方式来实现的。

如果你觉得这篇文章对你有用,点个赞吧~ 你的点赞是我创作的最大动力~想第一时间看到我更新的文章,可以微信搜索公众号「CodingCode」。



Tags:数据同步   点击:()  评论:()
声明:本站部分内容及图片来自互联网,转载是出于传递更多信息之目的,内容观点仅代表作者本人,如有任何标注错误或版权侵犯请与我们联系(Email:2595517585@qq.com),我们将及时更正、删除,谢谢。
▌相关推荐
前言上篇文章简单介绍canal概念,本文结合常见的缓存业务去讲解canal使用。在实际开发过程中,通常都会把数据往redis缓存中保存一份,做下简单的查询优化。如果这时候数据库数据...【详细内容】
2022-01-04  Tags: 数据同步  点击:(0)  评论:(0)  加入收藏
本文根据罗代均老师在〖deeplus直播第261期〗线上分享演讲内容整理而成(文末有获取本期回放的方式,不要错过) 罗代均OPPO安第斯系统资深工程师 负责OPPO后端体系建设,包括API...【详细内容】
2021-04-22  Tags: 数据同步  点击:(296)  评论:(0)  加入收藏
重要说明:需要同步的表必须要有主键 主键 主键otter是一款基于Java且免费、开源基于数据库增量日志解析,准实时同步到本机房或异地机房的mysql/oracle数据库的解决方案。 Otte...【详细内容】
2019-11-13  Tags: 数据同步  点击:(264)  评论:(0)  加入收藏
前言如今大型的IT系统中,都会使用分布式的方式,同时会有非常多的中间件,如redis、消息队列、大数据存储等,但是实际核心的数据存储依然是存储在数据库,作为使用最广泛的数据库,如...【详细内容】
2019-08-30  Tags: 数据同步  点击:(327)  评论:(0)  加入收藏
▌简易百科推荐
前言上篇文章简单介绍canal概念,本文结合常见的缓存业务去讲解canal使用。在实际开发过程中,通常都会把数据往redis缓存中保存一份,做下简单的查询优化。如果这时候数据库数据...【详细内容】
2022-01-04  仰望四十五度的光    Tags:数据同步   点击:(0)  评论:(0)  加入收藏
mysql自动备份脚本::mysqldump -u root -p密码 test_zc >d:\bak.sql::pause@@echo offset errorlevel=0set path_home_mysql=D:\mysql-8.0.27-winx64\::MySQL安装的BIN目录...【详细内容】
2021-12-30  alantop    Tags:数据库脚本   点击:(7)  评论:(0)  加入收藏
1992年11月,Microsoft Access 1.0版本发布。同时,这也是Access数据库,第一次进入大家的视野。起初,Access的原名并不叫Access,而叫Cirrus。Ciruus开发于Visual Basic之前,当时的...【详细内容】
2021-12-29  乐图软件    Tags:Access   点击:(12)  评论:(0)  加入收藏
1增1.1【插入单行】insert [into] <表名> (列名) values (列值)例:insert into Strdents (姓名,性别,出生日期) values (&#39;开心朋朋&#39;,&#39;男&#39;,&#39;1980/6/15&#3...【详细内容】
2021-12-27  快乐火车9d3    Tags:SQL   点击:(11)  评论:(0)  加入收藏
最近发现还有不少做开发的小伙伴,在写存储过程的时候,在参考已有的不同的写法时,往往很迷茫, 不知道各种写法孰优孰劣,该选用哪种写法,以及各种写法的优缺点,本文以一个简单的查询...【详细内容】
2021-12-23  linux上的码农    Tags:sql   点击:(12)  评论:(0)  加入收藏
《开源精选》是我们分享Github、Gitee等开源社区中优质项目的栏目,包括技术、学习、实用与各种有趣的内容。本期推荐的HasorDB 是一个全功能数据库访问工具,提供对象映射、丰...【详细内容】
2021-12-22  GitHub精选    Tags:HasorDB   点击:(10)  评论:(0)  加入收藏
作者丨Rafal Grzegorczyk译者丨陈骏策划丨孙淑娟【51CTO.com原创稿件】您是否还在手动对数据库执行各种脚本?您是否还在浪费时间去验证数据库脚本的正确性?您是否还需要将...【详细内容】
2021-12-22    51CTO  Tags:Liquibase   点击:(10)  评论:(0)  加入收藏
场景描述:由于生产环境的表比较复杂,字段很多。这里我们做下简化,只为说明今天要聊的问题。有两张表 tab1,tab2: tab1 数据如下: tab2 数据如下: 然后给你看下,我用来统计 name=&#3...【详细内容】
2021-12-20  Bald    Tags:SQL   点击:(8)  评论:(0)  加入收藏
前言知识无底,学海无涯,知识点虽然简单,但是比较多,所以将MySQL的基础写出来,方便自己以后查找,还有就是分享给大家。一、SQL简述1.SQL的概述Structure Query Language(结构化查...【详细内容】
2021-12-16  谣言止于独立思考    Tags:SQL基础   点击:(15)  评论:(0)  加入收藏
前言作为一名测试工程师,工作中在对测试结果进行数据比对的时候,或多或少要和数据库打交道的,要和数据库打交道,那么一些常用的 SQL 查询语法必须要掌握。最近有部分做测试小伙...【详细内容】
2021-12-14  柠檬班软件测试    Tags:SQL   点击:(16)  评论:(0)  加入收藏
最新更新
栏目热门
栏目头条