系统重构数据同步利器之Canal实战篇

Posted 浅谈架构

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了系统重构数据同步利器之Canal实战篇相关的知识,希望对你有一定的参考价值。

一、背景

二话不说,先上图

上图来自于官网(https://github.com/alibaba/canal),基本上涵盖了目前生产环境使用场景了,众所周知,Canal做数据同步已经是行业内标杆了。我们生产环境也用Canal监听binlog数据变更,然后解析成对应数据发送到MQ(RocketMQ)。一些非主流程业务,异步场景消费MQ处理即可。

但是我这篇文章,主要想聊一聊在做系统重构时,新老系统数据双向同步时Canal的使用场景。

注:关于系统重构的介绍我这里就不叙述了,大家可以看我之前写的系列文章:浅谈系统重构

二、关于双向同步
  1. 什么是双向同步?

所谓双向同步,就是老系统数据库数据往新系统数据库同步,新系统数据库同时也往老系统数据库同步。从而保证新系统,老系统数据库数据完全一致。系统重构时如果上线出现问题,随时能切回原来老系统,这也为灰度方案提供了底层保障。

  1. 一般同步如何做?各自优缺点是什么?

方案一: Dao层拦截方案

方案说明: 在Dao层打洞拦截所有写请求(insert,update,delete), 然后写入MQ队列,再通过消费MQ队列写入对应数据库。

优点: 这种方案实现比较简单。

缺点: 对于老系统数据库,可能有很多个服务在写入,如果从Dao层拦截,可能要修改很多地方,改动较大。

方案二: 利用Canal订阅解析Binlog

方案说明: 利用Canal订阅Binlog,解析成数据,再写入到对应数据库(这里可以直接写入,也可以先写入MQ,再消费MQ写入,推荐后者)。

优点:能够解决系统多处写入问题。

缺点:引入新的组件Canal,复杂度增加。

下面,我们就来实战操作一下方案二。

三、环境准备(Centos系统为例)1. 安装mysql
wget https://dev.mysql.com/get/mysql80-community-release-el8-1.noarch.rpm
yum install  mysql80-community-release-el8-1.noarch.rpm

#
禁用centos自带的mysql
yum module disable mysql -y
#安装
yum install mysql-community-server -y
#启动
systemctl start mysqld
#查看启动状态 提升 Active: active (running) 表示成功
systemctl status mysqld
#查看初始密码
grep \'temporary password\' /var/log/mysqld.log
#初始密码登录
mysql -uroot -p\'AXXXXX\'  -hlocalhost -P3306
#修改ROOT密码
ALTER USER \'root\'@\'localhost\' IDENTIFIED BY \'BXXXXX\';

2、 环境部署

1)、查看当前mysql是否开启了binlog模式, 如果log_bin的值为OFF是未开启,为ON是已开启 。

SHOW VARIABLES LIKE \'%log_bin%\'

2)、若未开启需要修改/ect/my.cnf 开启binlog模式

[mysqld]
log-bin=mysql-bin
binlog-format=ROW
server_id=1

修改完之后重启mysql服务

3)、创建用户并且授权

create user canal@\'%\' IDENTIFIED by \'XXXX\';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO \'canal\'@\'%\';
FLUSH PRIVILEGES;

3、 Canal服务端安装

1)、canal下载地址

wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz

2)、解压到指定目录

mkdir  canal-server-1.1.4
tar -zxf canal.deployer-1.1.4.tar.gz -C canal-server-1.1.4/

3)、修改配置文件 查看主库 binlog position

mysql> show master status;
+---------------+----------+--------------+------------------+-------------------+
| File          | Position | Binlog_Do_DB | Binlog_Ignore_DB | Executed_Gtid_Set |
+---------------+----------+--------------+------------------+-------------------+
| binlog.000002 |     4526 |              |                  |                   |
+---------------+----------+--------------+------------------+-------------------+
1 row in set (0.00 sec)

修改配置文件 conf/example/instance.properties

# position info
canal.instance.master.address=IP:3306
# 这里对应上面的File
canal.instance.master.journal.name=binlog.000002
# 这里对应上面的Position
canal.instance.master.position=4526

#
 username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=XXXX

4)、启动 canal-server


./bin/startup.sh
# 查看日志
tail -f logs/example/example.log 

以上就完成了Canal-Server的单实例版本实现,生成环境集群环境一般是运维搭建,我们测试就用单实例版本。

关于Canal的HA机制设计下面简单介绍下,生产环境推荐使用。

canal的HA分为两部分,canal server和canal client分别有对应的HA实现


canal server:

为了减少对mysql dump的请求,不同server上的instance要求同一时间只能有一个处于running,其他的处于standby状态.

canal client:

为了保证有序性,一份instance同一时间只能由一个canal client进行get/ack/rollback操作,否则客户端接收无法保证有序。整个HA机制的控制主要是依赖了zookeeper的几个特性,watcher和EPHEMERAL节点(和session生命周期绑定),这里就不展开介绍了,有兴趣的同学可以看下官方wiki。

四、演示环节

由于canal组件封装的代码太多,我花了几个晚上业余时间写的(请点个赞吧),代码已经开源至gitee,有需要的同学可以clone下来看。

gitee地址: https://gitee.com/bytearch/fast-cloud

目前已支持simple直连模式和zookeeper集群模式

下面演示canal-client-demo

  1. 新建库order_center ,并且创建表order_info

    CREATE TABLE `order_info` (
      `order_id` bigint(20unsigned NOT NULL,
      `user_id` int(11DEFAULT \'0\' COMMENT \'用户id\',
      `status` int(11DEFAULT \'0\' COMMENT \'订单状态\',
      `booking_date` datetime DEFAULT NULL,
      `create_time` datetime DEFAULT NULL,
      `update_time` datetime DEFAULT NULL,
      PRIMARY KEY (`order_id`),
      KEY `idx_user_id` (`user_id`),
      KEY `idx_bdate` (`booking_date`),
      KEY `idx_ctime` (`create_time`),
      KEY `idx_utime` (`update_time`)
    ENGINE=InnoDB DEFAULT CHARSET=utf8;
  2. 添加处理器handler

    @CanalHandler(value = "orderInfoHandler", destination = "example", schema = "order_center", table = "order_info", eventType = CanalEntry.EventType.UPDATE, CanalEntry.EventType.INSERT,CanalEntry.EventType.DELETE)
    public class OrderHandler implements Handler<CanalEntryBO

        @Override
        public boolean beforeHandle(CanalEntryBO canalEntryBO) 
            if (canalEntryBO == null
                return false;
            
            return true;
        

        @Override
        public void handle(CanalEntryBO canalEntryBO) 
            //1. 更新后数据解析
            OrderInfoDTO orderInfoDTO = CanalAnalysisUti.analysis(OrderInfoDTO.classcanalEntryBO.getRowData().getAfterColumnsList());
            System.out.println("event:" + canalEntryBO.getEventType());
            System.out.println(orderInfoDTO);
            //2. 后续操作 TODO
        


    1. 添加配置

      canal:
        clients:
          simpleInstance:
            enable: true
            mode: simple
            servers: XXXXX:11111
            batchSize: 1000
            destination: example
            getMessageTimeOutMS: 500
          #zkInstance:
          #   enable: true
          #   mode: zookeeper
          #   servers: 172.30.1.6:2181,172.30.1.7:2181,172.30.1.8:2181
          #   batchSize: 1000
          #   #filter: order_center.order_info
          #   destination: example
          #   getMessageTimeOutMS: 500

      配置说明:

      public class CanalProperties 
          /**
           * 是否开启 默认不开启
           */

          private boolean enable = false;
          /**
           * 模式
           * zookeeper: zk集群模式
           * simple: 简单直连模式
           */

          private String mode = "simple";

          /**
           * canal-server地址 多个地址逗号隔开
           */

          private String servers;

          /**
           * canal-server 的destination
           */

          private String destination;

          private String username = "";

          private String password = "";

          private int batchSize = 5 * 1024;

          private String filter = StringUtils.EMPTY;

          /**
           * getMessage & handleMessage 的重试次数, 最后一次重试会ack, 之前的重试会rollback
           */

          private int retries = 3;

          /**
           * getMessage & handleMessage 的重试间隔ms
           * canal-client内部代码 的重试间隔ms
           */

          private int retryInterval = 3000;

          private long getMessageTimeOutMS = 1000;

  1. 测试insert和update操作

    mysql> insert into order_info(order_id,user_id,status,booking_date,create_time,update_time) values(6666666,6,10,"2022-02-19 00:00:00","2022-02-19 00:00:00""2022-02-19 00:00:00");
    Query OK, 1 row affected (0.00 sec)

    mysql> update order_info set status=20 where order_id=66666;
    Query OK, 0 rows affected (0.00 sec)
    Rows matched: 0  Changed: 0  Warnings: 0

    mysql> 
  2. 测试结果

    2022-02-18 19:29:52.399  INFO 47706 --- [ lc-work-thread] c.b.s.canal.cycle.SimpleCanalLifeCycle   : 
    ****************************************************
    * Batch Id: [11] ,count : [3] , memsize : [189] , Time : 2022-02-18 19:29:52.399
    * Start : [binlog.000003:18893:1645183792000(2022-02-18 19:29:52.000)] 
    * End : [binlog.000003:19123:1645183792000(2022-02-18 19:29:52.000)] 
    ****************************************************

    2022-02-18 19:29:52.405  INFO 47706 --- [ lc-work-thread] c.b.s.canal.cycle.SimpleCanalLifeCycle   : 
    ----------------> binlog[binlog.000003:19056] , name[order_center,order_info] , eventType : INSERT ,tableName : order_info, executeTime : 1645183792000 , delay : 400ms

    event:INSERT
    OrderInfoDTOorderId=6666666, userId=6, status=10, bookingDate=2022-02-19 00:00:00, createTime=2022-02-19 00:00:00, updateTime=2022-02-19 00:00:00

    大功告成,到这一步就顺利完成了Canal订阅解析binlog步骤。

五、数据同步注意事项

抛下两个问题大家可以思考下

  1. 数据双向同步时,如何解决数据回环问题?

    例如新系统产生的数据,同步到老系统,不能又回流到新系统,如何解决?

  2. 数据顺序问题,如果写入到MQ,是否要保证顺序消费?如何实现?

  3. 当同步并发比较大,如何提高同步速度。

温馨提示: 此专题未完,以上问题我将在下一篇文章《系统重构数据同步利器之Canal实战篇-续》实现,大家可以提前思考一下。

六、 号外

欢迎大家关注”浅谈架构“ 公众号,不定期分享原创文章

有任何问题,欢迎私信我交流。


Canal增量数据同步利器介绍与安装

文章目录

Canal增量数据同步利器

Canal介绍

canal主要用途是基于 MySQL 数据库增量日志解析,并能提供增量数据订阅和消费,应用场景十分丰富。

参考官方文档:

Canal应用场景

  1. 电商场景下商品、用户实时更新同步到至Elasticsearch、solr等搜索引擎;
  2. 价格、库存发生变更实时同步到redis;
  3. 数据库异地备份、数据同步;
  4. 代替使用轮询数据库方式来监控数据库变更,有效改善轮询耗费数据库资源。

Canal原理

Canal的工作原理与MySQL主从复制原理类似,首先我们来回顾下MySQL主从复制的原理:

MySQL主从复制原理

  1. MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
  2. MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
  3. MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

Canal工作原理

Canal则是把自己伪装成 slave 节点与master进行交互

  1. canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
  2. MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
  3. canal 解析 binary log 对象(原始为 byte 流)

Canal安装

参考文档:https://github.com/alibaba/canal/wiki/QuickStart

步骤1:MySQL Bin-log开启

1)MySQL开启bin-log

a.进入mysql容器

docker exec -it -u root mysql /bin/bash

b.开启mysql的binlog

cd /etc/mysql/mysql.conf.d

在mysqld.cnf最下面添加如下配置

# 开启 binlog
log-bin=/var/lib/mysql/mysql-bin
# 选择 ROW 模式
binlog-format=ROW
# 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
server-id=12345

c.创建账号并授权

授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限, 如果已有账户可直接 grant:

create user canal@'%' IDENTIFIED by 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

d.重启mysql

docker restart mysql

开启bin-log后,我们可以用sql语句查看下:

show variables like '%log_bin%'

效果如下:

步骤2:Canal安装

1)拉取镜像

docker pull canal/canal-server:v1.1.1

2)安装容器

a.安装canal-server容器

docker run -p 11111:11111 --name canal -d docker.io/canal/canal-server

b.配置canal-server

修改/home/admin/canal-server/conf/canal.properties,将它的id属性修改成和mysql数据库中server-id不同的值,如下图:

c.修改/home/admin/canal-server/conf/example/instance.properties,配置要监听的数据库服务地址和监听数据变化的数据库以及表,修改如下:


指定监听数据库表的配置如下canal.instance.filter.regex

mysql 数据解析关注的表,Perl正则表达式.
多个正则之间以逗号(,)分隔,转义符需要双斜杠(\\)
常见例子:

1.  所有表:.*   or  .*\\\\..*
2.  canal schema下所有表: canal\\\\..*
3.  canal下的以canal打头的表:canal\\\\.canal.*
4.  canal schema下的一张表:canal.test1
5.  多个规则组合使用:canal\\\\..*,mysql.test1,mysql.test2 (逗号分隔)

注意:此过滤条件只针对row模式的数据有效(ps. mixed/statement因为不解析sql,所以无法准确提取tableName进行过滤)

3)重启canal:

docker restart canal

以上是关于系统重构数据同步利器之Canal实战篇的主要内容,如果未能解决你的问题,请参考以下文章

数据实时同步利器-Canal

实战 | MySQL Binlog通过Canal同步HDFS

ElasticSearch实战(四十七)-Canal 实现 MySQL 数据实时同步方案

实战 | canal 实现Mysql到Elasticsearch实时增量同步

ElasticSearch实战(四十七)-Canal 实现 MySQL 数据实时同步方案

实战!Spring Boot 整合 阿里开源中间件 Canal 实现数据增量同步!