canal 使用详解

Posted 在奋斗的大道

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了canal 使用详解相关的知识,希望对你有一定的参考价值。

第1章 Canal 简介

canal [kə'næl],译意为水道/管道/沟渠,主要用途是基于 mysql 数据库增量日志解析,提供增量数据订阅和消费

工作原理

  • canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送 dump 协议

  • MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )

  • canal 解析 binary log 对象(原始为 byte 流)

第2章 Canal 快速开始

环境准备

主机环境:Windows 11

数据库版本:MySQL-8数据库

Canal版本:canal.deployer-1.1.6

MySQL8准备

(1)检查MySQL 的binlog功能是否有开启

-- 是否开启binlog
show VARIABLES like 'log_bin';

(2)如果显示状态为OFF表示该功能未开启,开启binlog功能

1,修改 mysql 的配置文件 my.cnf
**/**/my.cnf
末尾追加内容:
#binlog文件名
log-bin=mysql-bin

#选择row模式
binlog_format=ROW

#mysql实例id,不能和canal的slaveId重复
server_id=1
2,windows 重启 mysql

(3)在mysql里面添加以下的相关用户和权限

CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
GRANT SHOW VIEW, SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO
'canal'@'%';
FLUSH PRIVILEGES;

Windows 安装Canal

下载地址

Canal Git 地址:https://github.com/alibaba/canal/releases

解压及配置

解压canal.deployer-1.1.6.tar.gz,我们可以看到里面有四个文件夹:

Canal 启动配置

(1)打开配置文件conf/example/instance.properties

#################################################
## v1.0.26版本后会自动生成slaveId,所以可以不用配置
# canal.instance.mysql.slaveId=0

# enable gtid use true/false
canal.instance.gtidon=false

# 数据库地址
canal.instance.master.address=127.0.0.1:3306
# binlog日志名称
canal.instance.master.journal.name=mysql-bin.000001
# binlog偏移量
canal.instance.master.position=913
canal.instance.master.timestamp=
canal.instance.master.gtid=

# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=

# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal

#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=

# username/password
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

# table regex
canal.instance.filter.regex=.*\\\\..*
# table black regex
canal.instance.filter.black.regex=mysql\\\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch

# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,topic2:mytest2\\\\..*,.*\\\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.enableDynamicQueuePartition=false
#canal.mq.partitionsNum=3
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#canal.mq.partitionHash=test.table:id^name,.*\\\\..*
#################################################

知识点拓展:查看MySQL 的binlog日志名称 和binlog 偏移量

# 查看当前服务器使用的biglog文件及大小
show binary logs;

# 查看最新一个binlog日志文件名称和Position
show master status;

# 查看 binlog 日志列表
show master logs;

(2)Canal 启动

切换至Canal项目bin 文件夹(D:\\Canal\\canal.deployer-1.1.6\\bin),双击启动startup.bat

(3)查看Canal server 日志

切换至Canal项目logs/canal 文件夹

查看logs/canal/canal.log 日志内容

2023-02-06 15:45:55.188 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler
2023-02-06 15:45:55.193 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations
2023-02-06 15:45:55.198 [main] INFO  com.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server.
2023-02-06 15:45:55.358 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.43.80(192.168.43.80):11111]
2023-02-06 15:45:56.260 [main] INFO  com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......

(4)查看Canal instance 日志

切换至Canal项目logs/example文件夹

2023-02-06 17:06:18.146 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example 
2023-02-06 17:06:18.148 [main] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^.*\\..*$
2023-02-06 17:06:18.148 [main] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : ^mysql\\.slave_.*$
2023-02-06 17:06:18.148 [main] INFO  c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....
2023-02-06 17:06:18.201 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position
2023-02-06 17:06:18.316 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position mysql-bin.000001:4:1675666212000
2023-02-06 17:06:18.809 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mysql-bin.000001,position=4,serverId=1,gtid=<null>,timestamp=1675666212000] cost : 608ms , the next step is binlog dump
2023-02-06 17:06:33.829 [MultiStageCoprocessor-other-example-0] WARN  com.taobao.tddl.dbsync.binlog.LogDecoder - Skipping unrecognized binlog event Unknown type:41 from: mysql-bin.000003:157
2023-02-06 17:06:48.835 [MultiStageCoprocessor-other-example-0] WARN  com.taobao.tddl.dbsync.binlog.LogDecoder - Skipping unrecognized binlog event Unknown type:41 from: mysql-bin.000003:157

(5)Canal 停止

直接关闭Canal 服务运行窗口即可。

第3章 Docker 安装Canal

第一步:查看本地镜像、检查Canal镜像和下载Canal 镜像

# 查看本地镜像

docker images

# 检索Kafka镜像

docker search canal

# 下载Kafka 镜像指定版本

docker pull canal/canal-server:latest

第二步:docker 启动Canal

docker run --name canal -d canal/canal-server

知识点拓展:拷贝Canal 容器内部配置文件拷贝到外部

语法:docker cp [容器索引]:[内部路径] [外部路径]

实例:

docker cp canal:/home/admin/canal-server/conf/canal.properties /home/canal
docker cp canal:/home/admin/canal-server/conf/example/instance.properties /home/canal

第三步:修改Canal 配置文件instance.properties

# 编辑配置文件
vi  /home/canal/instance.properties

编辑内容:

#################################################
## v1.0.26版本后会自动生成slaveId,所以可以不用配置
# canal.instance.mysql.slaveId=0

# enable gtid use true/false
canal.instance.gtidon=false

# 数据库地址
canal.instance.master.address=127.0.0.1:3306
# binlog日志名称
canal.instance.master.journal.name=mysql-bin.000001
# binlog偏移量
canal.instance.master.position=913
canal.instance.master.timestamp=
canal.instance.master.gtid=

# rds oss binlog
canal.instance.rds.accesskey=
canal.instance.rds.secretkey=
canal.instance.rds.instanceId=

# table meta tsdb info
canal.instance.tsdb.enable=true
#canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
#canal.instance.tsdb.dbUsername=canal
#canal.instance.tsdb.dbPassword=canal

#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#canal.instance.standby.gtid=

# username/password
canal.instance.dbUsername=root
canal.instance.dbPassword=123456
canal.instance.connectionCharset = UTF-8
# enable druid Decrypt database password
canal.instance.enableDruid=false
#canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

# table regex
canal.instance.filter.regex=.*\\\\..*
# table black regex
canal.instance.filter.black.regex=mysql\\\\.slave_.*
# table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch
# table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)
#canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch

# mq config
canal.mq.topic=example
# dynamic topic route by schema or table regex
#canal.mq.dynamicTopic=mytest1.user,topic2:mytest2\\\\..*,.*\\\\..*
canal.mq.partition=0
# hash partition config
#canal.mq.enableDynamicQueuePartition=false
#canal.mq.partitionsNum=3
#canal.mq.dynamicTopicPartitionNum=test.*:4,mycanal:6
#canal.mq.partitionHash=test.table:id^name,.*\\\\..*
#################################################

第四步:基于Canal 外部配置文件,重新Canal 容器实例

docker run --name canal -p 11111:11111 -d -v /home/canal/instance.properties:/home/admin/canal-server/conf/example/instance.properties -v /home/canal/canal.properties:/home/admin/canal-server/conf/canal.properties canal/canal-server

可选指令:

  • 关闭Canal 容器

docker stop canal
  • 移除Canal 容器

docker rm canal

第4章 基于Canal 和Kafka,实现MySQL的Binlog 近实时同步

搭建一套可以用的组件需要部署MySQL、Zookeeper、Kafka和Canal四个中间件的实例。

Docker 安装MySQL

请参考:Docker 安装MySQL

CentOS-7安装ZooKeeper

Canal和Kafka集群都依赖于Zookeeper做服务协调,为了方便管理,一般会独立部署Zookeeper服务或者Zookeeper集群。

midkr /data/zk
# 创建数据目录
midkr /data/zk/data
cd /data/zk
wget http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.6.0/apache-zookeeper-3.6.0-bin.tar.gz
tar -zxvf apache-zookeeper-3.6.0-bin.tar.gz
cd apache-zookeeper-3.6.0-bin/conf
cp zoo_sample.cfg zoo.cfg && vim zoo.cfg

把zoo.cfg文件中的dataDir设置为/data/zk/data,然后启动Zookeeper:

[root@localhost conf]# sh /data/zk/apache-zookeeper-3.6.0-bin/bin/zkServer.sh start
/usr/bin/java
ZooKeeper JMX enabled by default
Using config: /data/zk/apache-zookeeper-3.6.0-bin/bin/../conf/zoo.cfg
Starting zookeeper ... STARTED

注意一点,要启动此版本的Zookeeper服务必须本地安装好JDK8+。启动的默认端口是2181,启动成功后的日志如下:

CentOS-7 安装Kafka

Kafka是一个高性能分布式消息队列中间件,它的部署依赖于Zookeeper。笔者在此选用2.4.0并且Scala版本为2.13的安装包:

mkdir /data/kafka
mkdir /data/kafka/data
wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.4.0/kafka_2.13-2.4.0.tgz
tar -zxvf kafka_2.13-2.4.0.tgz

解压后/data/kafka/kafka_2.13-2.4.0/config/server.properties配置中对应的zookeeper.connect=localhost:2181已经符合需要,不必修改,需要修改日志文件的存放目录log.dirs为/data/kafka/data。然后启动Kafka服务:

sh /data/kafka/kafka_2.13-2.4.0/bin/kafka-server-start.sh /data/kafka/kafka_2.13-2.4.0/config/server.properties

知识拓展:kafka 后台进行运行设置

Kafka启动后一旦退出控制台就会结束Kafka进程,可以添加-daemon参数用于控制Kafka进程后台不挂断运行。

sh /data/kafka/kafka_2.13-2.4.0/bin/kafka-server-start.sh -daemon /data/kafka/kafka_2.13-2.4.0/config/server.properties

CentOS-7 安装Canal

CentOS 安装Canal 核心步骤

mkdir /data/canal
cd /data/canal
# 这里注意一点,Github在国内被墙,下载速度极慢,可以先用其他下载工具下载完再上传到服务器中
wget https://github.com/alibaba/canal/releases/download/canal-1.1.6/canal.deployer-1.1.6.tar.gz
tar -zxvf canal.deployer-1.1.6.tar.gz

Canal 解压后目录说明:

- bin   # 运维脚本
- conf  # 配置文件
  canal_local.properties  # canal本地配置,一般不需要动
  canal.properties        # canal服务配置
  logback.xml             # logback日志配置
  metrics                 # 度量统计配置
  spring                  # spring-实例配置,主要和binlog位置计算、一些策略配置相关,可以在canal.properties选用其中的任意一个配置文件
  example                 # 实例配置文件夹,一般认为单个数据库对应一个独立的实例配置文件夹
    instance.properties   # 实例配置,一般指单个数据库的配置
- lib   # 服务依赖包
- logs  # 日志文件输出目录

在开发和测试环境建议把logback.xml的日志级别修改为DEBUG方便定位问题。这里需要关注canal.properties和instance.properties两个配置文件。canal.properties文件中,需要修改:

  • 去掉canal.instance.parser.parallelThreadSize = 16这个配置项的注释,也就是启用此配置项,和实例解析器的线程数相关,不配置会表现为阻塞或者不进行解析。

  • canal.serverMode配置项指定为kafka,可选值有tcp、kafka和rocketmq(master分支或者最新的的v1.1.5-alpha-1版本,可以选用rabbitmq),默认是kafka。

  • canal.mq.servers配置需要指定为Kafka服务或者集群Broker的地址,这里配置为127.0.0.1:9092

canal.mq.servers在不同的canal.serverMode有不同的意义。
kafka模式下,指Kafka服务或者集群Broker的地址,也就是bootstrap.servers
rocketmq模式下,指NameServer列表
rabbitmq模式下,指RabbitMQ服务的Host和Port

本文Kafka实例配置

找到canal.deployer-1.1.6/conf目录下的canal.properties配置文件:

# tcp, kafka, RocketMQ 这里选择kafka模式
canal.serverMode = kafka
# 解析器的线程数,打开此配置,不打开则会出现阻塞或者不进行解析的情况
canal.instance.parser.parallelThreadSize = 16
# 配置MQ的服务地址,这里配置的是kafka对应的地址和端口
canal.mq.servers = 127.0.0.1:9092
# 配置instance,在conf目录下要有example同名的目录,可以配置多个
canal.destinations = example

其他配置项可以参考下面两个官方Wiki的链接:

instance.properties一般指一个数据库实例的配置,Canal架构支持一个Canal服务实例,处理多个数据库实例的binlog异步解析。instance.properties需要修改的配置项主要包括:

  • canal.instance.mysql.slaveId需要配置一个和Master节点的服务ID完全不同的值,这里笔者配置为654321。

  • 配置数据源实例,包括地址、用户、密码和目标数据库:

  • canal.instance.master.address,这里指定为127.0.0.1:3306。

  • canal.instance.dbUsername,这里指定为canal。

  • canal.instance.dbPassword,这里指定为QWqw12!@。

  • 新增canal.instance.defaultDatabaseName,这里指定为test(需要在MySQL中建立一个test数据库,见前面的流程)。

  • Kafka相关配置,这里暂时使用静态topic和单个partition:

  • canal.mq.topic,这里指定为test,也就是解析完的binlog结构化数据会发送到Kafka的命名为testtopic

  • canal.mq.partition,这里指定为0。

本文MySQL8实例配置

配置instance,找到/conf/example/instance.properties配置文件:

## mysql serverId , v1.0.26+ will autoGen(自动生成,不需配置)
# canal.instance.mysql.slaveId=0

# position info
canal.instance.master.address=127.0.0.1:3306
# 在Mysql执行 SHOW MASTER STATUS;查看当前数据库的binlog
canal.instance.master.journal.name=mysql-bin.000006
canal.instance.master.position=4596
# 账号密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=Canal@****
canal.instance.connectionCharset = UTF-8
#MQ队列名称
canal.mq.topic=canaltopic
#单队列模式的分区下标
canal.mq.partition=0

配置工作做好之后,可以启动Canal服务:

sh /data/canal/bin/startup.sh 
# 查看服务日志
tail -100f /data/canal/logs/canal/canal
# 查看实例日志  -- 一般情况下,关注实例日志即可
tail -100f /data/canal/logs/example/example.log

启动正常后,见实例日志如下:

数据演示

在test数据库创建一个订单表

use `test`;

CREATE TABLE `order`
(
    id          BIGINT UNIQUE PRIMARY KEY AUTO_INCREMENT COMMENT '主键',
    order_id    VARCHAR(64)    NOT NULL COMMENT '订单ID',
    amount      DECIMAL(10, 2) NOT NULL DEFAULT 0 COMMENT '订单金额',
    create_time DATETIME       NOT NULL DEFAULT CURRENT_TIMESTAMP COMMENT '创建时间',
    UNIQUE uniq_order_id (`order_id`)
) COMMENT '订单表';

INSERT INTO `order`(order_id, amount) VALUES ('20230207093012', 1999);
UPDATE `order` SET amount = 2000 WHERE order_id = '20230207093012';
DELETE  FROM `order` WHERE order_id = '20230207093012';

利用Kafka的kafka-console-consumer或者Kafka Tools查看test这个topic的数据:

sh /data/kafka/kafka_2.13-2.4.0/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092 --from-beginning --topic test

第5章 基于Canal 和RabbitMQ,实现MySQL的Binlog 近实时同步

Docker 安装MySQL

请参考:Docker 安装MySQL

Docker 安装RabbitMQ

请参考:Docker 安装RabbitMQ

RabbitMQ 增加交换机和队列

  • 添加交换机 canal_exchange

  • 添加队列 canal_queue

  • 队列绑定交换机

CentOS-7 安装Canal

centos-7 解压安装Canal与上 一章节一致,顾不在做详细描述***。本章节重点讲解Canal 配置RabbitMQ 参数配置。

Canal Server配置

需要配置的东西就两项,一个是监听数据库配置,另一个是 RabbitMQ 连接配置。

instance.properties

监听数据库配置

cd /example 目录下

canal.properties

配置 Canal 服务方式为 RabbitMQ 和连接配置

进入到conf文件,打开canal.properties

serverMode(服务模式)修改为rabbitMQ,默认TCP.

RabbitMQ 服务相关参数设置。

第6章 Canal API

快速开始

第一步:maven 添加相关jar包依赖

<dependency>
    <groupId>com.alibaba.otter</groupId>
    <artifactId>canal.client</artifactId>
    <version>1.1.0</version>
</dependency>

第二步:编写main方法测试

import java.net.InetSocketAddress;
import java.util.List;


import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.common.utils.AddressUtils;
import com.alibaba.otter.canal.protocol.Message;
import com.alibaba.otter.canal.protocol.CanalEntry.Column;
import com.alibaba.otter.canal.protocol.CanalEntry.Entry;
import com.alibaba.otter.canal.protocol.CanalEntry.EntryType;
import com.alibaba.otter.canal.protocol.CanalEntry.EventType;
import com.alibaba.otter.canal.protocol.CanalEntry.RowChange;
import com.alibaba.otter.canal.protocol.CanalEntry.RowData;


public class SimpleCanalClientExample 


public static void main(String args[]) 
    // 创建链接
    CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),
                                                                                        11111), "example", "", "");
    int batchSize = 1000;
    int emptyCount = 0;
    try 
        connector.connect();
        connector.subscribe(".*\\\\..*");
        connector.rollback();
        int totalEmptyCount = 120;
        while (emptyCount < totalEmptyCount) 
            Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
            long batchId = message.getId();
            int size = message.getEntries().size();
            if (batchId == -1 || size == 0) 
                emptyCount++;
                System.out.println("empty count : " + emptyCount);
                try 
                    Thread.sleep(1000);
                 catch (InterruptedException e) 
                
             else 
                emptyCount = 0;
                // System.out.printf("message[batchId=%s,size=%s] \\n", batchId, size);
                printEntry(message.getEntries());
            

            connector.ack(batchId); // 提交确认
            // connector.rollback(batchId); // 处理失败, 回滚数据
        

        System.out.println("empty too many times, exit");
     finally 
        connector.disconnect();
    


private static void printEntry(List<Entry> entrys) 
    for (Entry entry : entrys) 
        if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) 
            continue;
        

        RowChange rowChage = null;
        try 
            rowChage = RowChange.parseFrom(entry.getStoreValue());
         catch (Exception e) 
            throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                                       e);
        

        EventType eventType = rowChage.getEventType();
        System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                                         entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                                         entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                                         eventType));

        for (RowData rowData : rowChage.getRowDatasList()) 
            if (eventType == EventType.DELETE) 
                printColumn(rowData.getBeforeColumnsList());
             else if (eventType == EventType.INSERT) 
                printColumn(rowData.getAfterColumnsList());
             else 
                System.out.println("-------&gt; before");
                printColumn(rowData.getBeforeColumnsList());
                System.out.println("-------&gt; after");
                printColumn(rowData.getAfterColumnsList());
            
        
    


private static void printColumn(List<Column> columns) 
    for (Column column : columns) 
        System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
    


第三步:启动Canal 服务

第四步: 运行Canal 客户端的main 方法,控制台输出如下信息:

empty count : 1
empty count : 2
empty count : 3
empty count : 4

含义:数据库无变更记录。

第五步:模拟数据库变更操作

mysql> use test;
Database changed
mysql> CREATE TABLE `xdual` (
    ->   `ID` int(11) NOT NULL AUTO_INCREMENT,
    ->   `X` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
    ->   PRIMARY KEY (`ID`)
    -> ) ENGINE=InnoDB AUTO_INCREMENT=3 DEFAULT CHARSET=utf8 ;
Query OK, 0 rows affected (0.06 sec)
mysql> insert into xdual(id,x) values(null,now());Query OK, 1 row affected (0.06 sec)

再次查看Canal 客户端,控制台输出信息:

empty count : 1
empty count : 2
empty count : 3
empty count : 4
================> binlog[mysql-bin.001946:313661577] , name[test,xdual] , eventType : INSERT
ID : 4    update=true
X : 2013-02-05 23:29:46    update=true

Canal API 文档说明

Canal 类设计

大致分为几部分:

  • ClientIdentity
    canal client和server交互之间的身份标识,目前clientId写死为1001. (目前canal server上的一个instance只能有一个client消费,clientId的设计是为1个instance多client消费模式而预留的,暂时不需要理会)

  • CanalConnector
    SimpleCanalConnector/ClusterCanalConnector : 两种connector的实现,simple针对的是简单的ip直连模式,cluster针对多ip的模式,可依赖CanalNodeAccessStrategy进行failover控制

  • CanalNodeAccessStrategy
    SimpleNodeAccessStrategy/ClusterNodeAccessStrategy:两种failover的实现,simple针对给定的初始ip列表进行failover选择,cluster基于zookeeper上的cluster节点动态选择正在运行的canal server.

  • ClientRunningMonitor/ClientRunningListener/ClientRunningData
    client running相关控制,主要为解决client自身的failover机制。canal client允许同时启动多个canal client,通过running机制,可保证只有一个client在工作,其他client做为冷备. 当运行中的client挂了,running会控制让冷备中的client转为工作模式,这样就可以确保canal client也不会是单点. 保证整个系统的高可用性.

javadoc查看:

Canal server/client交互协议

具体的网络协议格式,可参见:CanalProtocol.proto

Canal get/ack/rollback协议

get/ack/rollback协议介绍:

  • Message getWithoutAck(int batchSize),允许指定batchSize,一次可以获取多条,每次返回的对象为Message,包含的内容为:
    a. batch id 唯一标识
    b. entries 具体的数据对象,可参见下面的数据介绍

  • getWithoutAck(int batchSize, Long timeout, TimeUnit unit),相比于getWithoutAck(int batchSize),允许设定获取数据的timeout超时时间
    a. 拿够batchSize条记录或者超过timeout时间
    b. timeout=0,阻塞等到足够的batchSize

  • void rollback(long batchId),顾命思议,回滚上次的get请求,重新获取数据。基于get获取的batchId进行提交,避免误操作

  • void ack(long batchId),顾命思议,确认已经消费成功,通知server删除数据。基于get获取的batchId进行提交,避免误操作

canal的get/ack/rollback协议和常规的jms协议有所不同,允许get/ack异步处理,比如可以连续调用get多次,后续异步按顺序提交ack/rollback,项目中称之为流式api.

流式api设计的好处:

  • get/ack异步化,减少因ack带来的网络延迟和操作成本 (99%的状态都是处于正常状态,异常的rollback属于个别情况,没必要为个别的case牺牲整个性能)

  • get获取数据后,业务消费存在瓶颈或者需要多进程/多线程消费时,可以不停的轮询get数据,不停的往后发送任务,提高并行化. (作者在实际业务中的一个case:业务数据消费需要跨中美网络,所以一次操作基本在200ms以上,为了减少延迟,所以需要实施并行化)

  • 每次get操作都会在meta中产生一个mark,mark标记会递增,保证运行过程中mark的唯一性

  • 每次的get操作,都会在上一次的mark操作记录的cursor继续往后取,如果mark不存在,则在last ack cursor继续往后取

  • 进行ack时,需要按照mark的顺序进行数序ack,不能跳跃ack. ack会删除当前的mark标记,并将对应的mark位置更新为last ack cursor

  • 一旦出现异常情况,客户端可发起rollback情况,重新置位:删除所有的mark, 清理get请求位置,下次请求会从last ack cursor继续往后取

流式api带来的异步响应模型:

数据对象格式简单介绍:EntryProtocol.proto

Entry  
    Header  
        logfileName [binlog文件名]  
        logfileOffset [binlog position]  
        executeTime [binlog里记录变更发生的时间戳,精确到秒]  
        schemaName   
        tableName  
        eventType [insert/update/delete类型]  
    entryType   [事务头BEGIN/事务尾END/数据ROWDATA]  
    storeValue  [byte数据,可展开,对应的类型为RowChange]  
RowChange

isDdl       [是否是ddl变更操作,比如create table/drop table]

sql         [具体的ddl sql]

rowDatas    [具体insert/update/delete的变更数据,可为多条,1个binlog event事件可对应多条变更,比如批处理]

beforeColumns [Column类型的数组,变更前的数据字段]

afterColumns [Column类型的数组,变更后的数据字段]


Column

index

sqlType     [jdbc type]

name        [column name]

isKey       [是否为主键]

updated     [是否发生过变更]

isNull      [值是否为null]

value       [具体的内容,注意为string文本]

说明:

  • 可以提供数据库变更前和变更后的字段内容,针对binlog中没有的name,isKey等信息进行补全

  • 可以提供ddl的变更语句

  • insert只有after columns, delete只有before columns,而update则会有before / after columns数据.

快速开始代码剖析

1. 创建Connector

a. 创建SimpleCanalConnector (直连ip,不支持server/client的failover机制)

CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress(AddressUtils.getHostIp(),11111), destination, "", "");

b. 创建ClusterCanalConnector (基于zookeeper获取canal server ip,支持server/client的failover机制)

CanalConnector connector = CanalConnectors.newClusterConnector("10.20.144.51:2181", destination, "", "");

c. 创建ClusterCanalConnector (基于固定canal server的地址,支持固定的server ip的failover机制,不支持client的failover机制

CanalConnector connector = CanalConnectors.newClusterConnector(Arrays.asList(new InetSocketAddress(AddressUtils.getHostIp(),11111)), destination,"", "");

2. get/ack/rollback使用

3. RowData数据处理

第7章 Canal 适配器

基本说明

canal 1.1.1版本之后,提供了适配器功能,可将canal server的数据直接输出到目的地,不需要用户编写客户端。

温馨提示:特殊功能需求,还需要用户编写客户端实现

适配器整体结构

client-adapter分为适配器和启动器两部分,每个适配器会将自己所需的依赖打成一个包, 以SPI的方式让启动器动态加载。

启动器为 SpringBoot 项目, 支持canal-client启动的同时提供相关REST管理接口, 运行目录结构为:

- bin
    restart.sh
    startup.bat
    startup.sh
    stop.sh
- lib
   ...
- plugin 
    client-adapter.logger-1.1.1-jar-with-dependencies.jar
    client-adapter.hbase-1.1.1-jar-with-dependencies.jar
    ...
- conf
    application.yml
    - hbase
        mytest_person2.yml
- logs

以上目录结构最终会打包成 canal-adapter-*.tar.gz 压缩包

源码结构解析

launcher:启动器

logger:日志适配器

rdb:支持jdbc的关系型数据库适配器(mysql、oracle、postgress、sqlserver等)

hbase:hbase适配器

kudu:kudu适配器

Canal适配器之启动

Canal 适配器启动之配置文件application.yml

server:
  port: 8081
spring:
  jackson:
    date-format: yyyy-MM-dd HH:mm:ss
    time-zone: GMT+8
    default-property-inclusion: non_null

canal.conf:
  mode: tcp #tcp kafka rocketMQ rabbitMQ   #客户端消费模式,对应下面的consumerProperties
  flatMessage: true                        #是否以json字符串传递数据,仅对mq生效
  zookeeperHosts:                          #canal server集群部署时,创建curator客户端
                                           #tcp mode需要在consumerProperties tcp中设置
  syncBatchSize: 1000                      #每次同步的批数量
  retries: -1                              #重试次数,-1为无限次
  timeout:
  accessKey:
  secretKey:
  consumerProperties:
    # canal tcp consumer                   #canal adapter连接的canal server
    canal.tcp.server.host: 127.0.0.1:11111
    canal.tcp.zookeeper.hosts:
    canal.tcp.batch.size: 500
    canal.tcp.username:
    canal.tcp.password:
    # kafka consumer                      #canal adapter连接的kafka
    kafka.bootstrap.servers: 127.0.0.1:9092
    kafka.enable.auto.commit: false
    kafka.auto.commit.interval.ms: 1000
    kafka.auto.offset.reset: latest
    kafka.request.timeout.ms: 40000
    kafka.session.timeout.ms: 30000
    kafka.isolation.level: read_committed
    kafka.max.poll.records: 1000
    # rocketMQ consumer                  #canal adapter连接的rocketmq
    rocketmq.namespace:
    rocketmq.namesrv.addr: 127.0.0.1:9876
    rocketmq.batch.size: 1000
    rocketmq.enable.message.trace: false
    rocketmq.customized.trace.topic:
    rocketmq.access.channel:
    rocketmq.subscribe.filter:
    # rabbitMQ consumer                     #canal adapter连接的rabbitmq
    rabbitmq.host:
    rabbitmq.virtual.host:
    rabbitmq.username:
    rabbitmq.password:
    rabbitmq.resource.ownerId:

#  srcDataSources:
#    defaultDS:
#      url: jdbc:mysql://127.0.0.1:3306/mytest?useUnicode=true
#      username: root
#      password: 121212
  canalAdapters:
  - instance: example # canal instance Name or mq topic name
                      # 注意:instance name、topic name不支持通配符匹配
    groups:
    - groupId: g1      #一份数据可被多个groupId消费
                       #不同groupId并发执行,
                       #同一groupId内的adapters顺序执行
      outerAdapters:
      - name: logger   #输出到日志
#      - name: rdb     #输出到rdb(关系型数据库
#        key: mysql1   #输出到mysql数据库
#        properties:
#          jdbc.driverClassName: com.mysql.jdbc.Driver
#          jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true
#          jdbc.username: root
#          jdbc.password: 121212
#          druid.stat.enable: false
#          druid.stat.slowSqlMillis: 1000
#      - name: rdb
#        key: oracle1      #输出到oracle数据库
#        properties:
#          jdbc.driverClassName: oracle.jdbc.OracleDriver
#          jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
#          jdbc.username: mytest
#          jdbc.password: m121212
#      - name: rdb
#        key: postgres1     #输出到postgress数据库
#        properties:
#          jdbc.driverClassName: org.postgresql.Driver
#          jdbc.url: jdbc:postgresql://localhost:5432/postgres
#          jdbc.username: postgres
#          jdbc.password: 121212
#          threads: 1
#          commitSize: 3000
#      - name: hbase          #输出到hbase
#        properties:
#          hbase.zookeeper.quorum: 127.0.0.1
#          hbase.zookeeper.property.clientPort: 2181
#          zookeeper.znode.parent: /hbase
#      - name: es            #输出到es
#        hosts: 127.0.0.1:9300 # 127.0.0.1:9200 for rest mode
#        properties:
#          mode: transport # or rest
#          # security.auth: test:123456 #  only used for rest mode
#          cluster.name: elasticsearch
#      - name: kudu       #输出到kudu
#        key: kudu
#        properties:
#          kudu.master.address: 127.0.0.1 # ',' split multi address
#      - name: phoenix     #输出到phoenix
#        key: phoenix
#        properties:
#          jdbc.driverClassName: org.apache.phoenix.jdbc.PhoenixDriver
#          jdbc.url: jdbc:phoenix:127.0.0.1:2181:/hbase/db
#          jdbc.username:
#          jdbc.password:

Canal适配器实例

业务需求:商品表新增商品数据实时同步ES搜索引擎。

解决方案:基于Canal监听MySQL-binlog 日志信息变化,通过Canal-Adapter 读取Canal 数据变更记录,同时写入ES搜索引擎。

1、软件版本

MySQL:8.x
canal:1.1.6
adapter:1.1.6
elasticsearch:7.4.2

2、MySQL开启binlog

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
# 是否开启binlog
show variables like 'log_bin';
# 结果
log_bin    ON

# binlog模式
show variables like 'binlog_format';
# 结果
binlog_format    ROW
# 创建用户canal及密码设置
CREATE USER canal IDENTIFIED BY 'canal';  
# 赋权
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
# 刷新生效
FLUSH PRIVILEGES;

3、下载canal及adapter

# canal-server
https://github.com/alibaba/canal/releases/download/canal-1.1.6/canal.deployer-1.1.6.tar.gz
# canal-adapter
https://github.com/alibaba/canal/releases/download/canal-1.1.6/canal.adapter-1.1.6.tar.gz

4、编辑canal配置文件

vi conf/example/instance.properties

此处只展示修改的配置

# 伪装成从库的slaveId,不能与MySQL重复
canal.instance.mysql.slaveId=1234
# 数据库的ip:端口
canal.instance.master.address=127.0.0.1:3306
# 数据库用户名密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal

5、启动canal

cd bin
sh startup.sh

如遇到如下报错

OpenJDK 64-Bit Server VM warning: ignoring option PermSize=96m; support was removed in 8.0
OpenJDK 64-Bit Server VM warning: ignoring option MaxPermSize=256m; support was removed in 8.0
OpenJDK 64-Bit Server VM warning: UseCMSCompactAtFullCollection is deprecated and will likely be removed in a future release.

The stack size specified is too small, Specify at least 384k
Error: Could not create the Java Virtual Machine.
Error: A fatal exception has occurred. Program will exit.

解决办法:调整startup.sh脚本的-Xss参数

vi bin/startup.sh
# 我这里调整到-Xss512k
if [ -n "$str" ]; then
JAVA_OPTS="-server -Xms2048m -Xmx3072m -Xmn1024m -XX:SurvivorRatio=2 -XX:PermSize=96m -XX:MaxPermSize=256m -Xss512k -XX:-UseAdaptiveSizePolicy -XX:MaxTenuringThreshold=15 -XX:+DisableExplicitGC -XX:+UseConcMarkSweepGC -XX:+CMSParallelRemarkEnabled -XX:+UseCMSCompactAtFullCollection -XX:+UseFastAccessorMethods -XX:+UseCMSInitiatingOccupancyOnly -XX:+HeapDumpOnOutOfMemoryError"
else
JAVA_OPTS="-server -Xms1024m -Xmx1024m -XX:NewSize=256m -XX:MaxNewSize=256m -XX:MaxPermSize=128m "

6、编辑adapter配置文件

cd conf
vi application.yml

此处只展示修改的配置

canal.conf:
  consumerProperties:
      # 单机配置属性
      # canal.tcp.server.host: 127.0.0.1:11111
  # 此配置数据库信息与canal-server配置的数据库信息相同
  srcDataSources:
      defaultDs:
        url: jbdc:mysql://127.0.0.1:3306/canal_test
        username: canal
        password: canal
  # 配置 ES信息
  canalAdapters:
    groups:
      outerAdapters: 
          - name: logger
          - name: es7
            hosts: http://127.0.0.1:9200
            properties:
                mode: rest
                security.auth: es账号:es密码
                cluster.name: es的名字

7、编辑es7 索引配置文件

cd conf/es7/
cp mytest_user.yml canal_test_order.yml
rm biz_order.yml customer.yml mytest_user.yml
vi canal_test_order.yml


dataSourceKey: defaultDS
destination: example
groupId: g1
esMapping:
  _index: canal_test_order
  _id: _id
# 这个必须要加,源文件没有
  _type: _doc
  upsert: true
#  pk: id
  sql: "select
        a.id as _id,
        a.order_no as orderNo,
        a.order_name as orderName
        from t_order a"
#  objFields:
#    _labels: array:;
  etlCondition: "where a.c_time>="
  commitBatch: 3000

8、启动Canal adapter

cd bin
sh startup.sh

查看Adapter 日志记录出现如下错误时:

java.lang.RuntimeException: java.lang.RuntimeException: java.lang.ClassCastException: com.alibaba.druid.pool.DruidDataSource cannot be cast to com.alibaba.druid.pool.DruidDataSource
    at com.alibaba.otter.canal.client.adapter.es7x.ES7xAdapter.init(ES7xAdapter.java:54) ~[client-adapter.es7x-1.1.5-jar-with-dependencies.jar:na]
    at com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterLoader.loadAdapter(CanalAdapterLoader.java:225) [client-adapter.launcher-1.1.5.jar:na]
    at com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterLoader.init(CanalAdapterLoader.java:56) [client-adapter.launcher-1.1.5.jar:na]
    at com.alibaba.otter.canal.adapter.launcher.loader.CanalAdapterService.init(CanalAdapterService.java:60) [client-adapter.launcher-1.1.5.jar:na]
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) ~[na:1.8.0_322]
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) ~[na:1.8.0_322]
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) ~[na:1.8.0_322]
    at java.lang.reflect.Method.invoke(Method.java:498) ~[na:1.8.0_322]
    at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleElement.invoke(InitDestroyAnnotationBeanPostProcessor.java:365) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor$LifecycleMetadata.invokeInitMethods(InitDestroyAnnotationBeanPostProcessor.java:308) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.beans.factory.annotation.InitDestroyAnnotationBeanPostProcessor.postProcessBeforeInitialization(InitDestroyAnnotationBeanPostProcessor.java:135) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]
    at org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory.applyBeanPostProcessorsBeforeInitialization(AbstractAutowireCapableBeanFactory.java:422) [spring-beans-5.0.5.RELEASE.jar:5.0.5.RELEASE]

解决办法,更改源码

下载canal-adapter源码

修改client-adapter/escore/pom.xml为

原
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
        </dependency>

改成
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>druid</artifactId>
            <scope>provided</scope>
        </dependency>

重新打包编译生成client-adapter.es7x-1.1.5-jar-with-dependencies.jar

放入canal-adapter的plugin目录下,替换原jar

重新启动,Canal Adapter 日志如下

9、测试

在MySQL手动插入一条数据

adapter.log打印日志如下

2023-02-09 15:20:25.519 [pool-2-thread-1] INFO  c.a.o.canal.client.adapter.logger.LoggerAdapterExample - DML: "data":["id":7,"order_no":1122,"order_name":"2211"],"database":"canal_test","destination":"example","es":1664090425000,"groupId":"g1","isDdl":false,"old":null,"pkNames":["id"],"sql":"","table":"t_order","ts":1664090425518,"type":"INSERT"
2023-02-09 15:20:25.520 [pool-2-thread-1] DEBUG c.a.o.canal.client.adapter.es.core.service.ESSyncService - DML: "data":["id":7,"order_no":1122,"order_name":"2211"],"database":"canal_test","destination":"example","es":1664090425000,"groupId":"g1","isDdl":false,"old":null,"pkNames":["id"],"sql":"","table":"t_order","ts":1664090425518,"type":"INSERT" 
Affected indexes: canal_test_order 

查看elasticsearch数据

# get 127.0.0.1:9200/canal_test_order/_search


    "took": 0,
    "timed_out": false,
    "_shards": 
        "total": 1,
        "successful": 1,
     

以上是关于canal 使用详解的主要内容,如果未能解决你的问题,请参考以下文章

canal实时同步mysql数据到elasticsearch(部署,配置,测试)

使用canal将mysql同步到es中

使用canal解决Mysql和ElasticSearch数据同步问题

mysql的数据表同步工具 canal的使用

使用Canal中间件同步MySql数据到ElasticSearch

数据实时同步利器-Canal