再学Canal

Posted Shi Peng

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了再学Canal相关的知识,希望对你有一定的参考价值。

一、Canal简介

canal 是用于解析 mysql 增量日志,提供增量数据订阅。

日志增量订阅的业务包括:
1)数据库实时备份
2)索引构建和实时维护(拆分异构索引、倒排索引等)
3)数据库更新后, cache 实时刷新
4)带业务逻辑的增量数据处理

1.1、Canal官方架构图

1.3、Canal原理

Canal的原理基于MySQL主备复制原理

1)MySQL master 将数据变更写入二进制日志( binary log, 其中记录叫做二进制日志事件binary log events,可以通过 show binlog events 进行查看)
2)MySQL slave 将 master 的 binary log events 拷贝到它的中继日志(relay log)
3)MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

canal 工作原理
1)canal 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
2)MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal Server)
3)canal 解析 binary log 对象(原始为 byte 流)

1.3、Canal的客户端多语言支持方式

方式1:多语言客户端

Canal支持 Server-Client 模式
Canal提供了多语言的客户端,用于读取Canal解析后的增量日志。

方式2:借助于MQ的多语言客户端

Canal也支持把增量日志的变更投递到MQ,借助于MQ的多语言客户端来实现多语言消费Canal。

二、Canal Server架构设计详解

2.1、MySQL的binlog简介

1)mysql的binlog是多文件存储
2)通过 binlog filename + binlog position 定位一个LogEvent

mysql的binlog数据格式,按照生成的方式,主要分为:statement-based、row-based、mixed。

mysql> show variables like 'binlog_format';
+---------------+-------+
| Variable_name | Value |
+---------------+-------+
| binlog_format | ROW   |
+---------------+-------+
1 row in set (0.00 sec)

目前canal支持所有模式的增量订阅(但配合同步时,因为statement只有sql,没有数据,无法获取原始的变更日志,所以一般建议为ROW模式)

2.2、Canal Server的架构图


1、server代表一个canal运行实例,对应于一个JVM
2、instance对应于一个数据队列 (1个server对应n个instance)

对于每个instance,包含的模块:
1)eventParser :数据源接入,模拟slave协议和master进行交互,协议解析
2)eventSink :Parser和Store链接器,进行数据过滤,加工,分发的工作
3)eventStore :数据存储,供client消费
4)metaManager :增量订阅&消费信息管理器

2.2.1、EventParser设计


Parser执行步骤如下:
1)Connection前先获取上一次解析成功的位置 (如果第一次启动,则获取初始指定的位置或者是当前数据库的binlog位点)
2)Connection建立链接,发送BINLOG_DUMP指令
3)MySQL开始推送Binaly Log给Canal
4)Canal接收到binlog后,通过parser进行协议解析:补充一些特定信息,如补充字段名字,字段类型,主键信息,unsigned类型处理
5)传递给EventSink模块进行数据存储,是一个串行阻塞操作,直到存储成功,再处理下一条
6)存储成功后,定时记录Binaly Log位置

总结:Parser模块从MySQL拉取增量日志时,也是要传递上一次拉取成功的offset作为参数的。

MySQL binlog的发送接收流程:

Step1:通过HandShake协议进行Client和DB的握手认证
Step2:握手成功以后,Client对DB发送show master status命令,此命令中回带回当前最新binlog存储在哪个文件,以及对应哪个偏移量。如果想从当前开始接收binglog,则在后面发送binlog dump命令的时候用这两个值就好。
Step3:发送show global variables like 'binlog_checksum’命令,这是由于binlog event发送回来的时候需要,在最后获取event内容的时候,会增加4个额外字节做校验用。mysql5.6.5以后的版本中binlog_checksum=crc32,而低版本都是binlog_checksum=none。如果不想校验,可以使用set命令设置set binlog_checksum=none
Step4:向MySQL发送Dump命令

Dump命令包图:

如上图所示,在报文中塞入binlogPosition和binlogFileName即可让master从相应的位置发送binlog event。

一旦发送了BinlogDump命令,master就会在数据库有变化的源源不断的推送binlog event到client。
binlog的类型有三种:
1)Statement:每一条会修改数据的sql都会记录在binlog中。
2)Row:不记录sql语句上下文相关信息,仅保存哪条记录被修改。
3)Mixedlevel:以上两种Level的混合。

2.2.2、EventSink设计


1、数据过滤:支持通配符的过滤模式,表名,字段内容等
2、数据路由/分发:解决1:n (1个parser对应多个store的模式)
3、数据归并:解决n:1 (多个parser对应1个store)
4、数据加工:在进入store之前进行额外的处理,比如join

2.2.2.1、数据1:n业务

为了合理的利用数据库资源, 一般常见的业务都是按照schema进行隔离,然后在mysql上层或者dao这一层面上,进行一个数据源路由,屏蔽数据库物理位置对开发的影响,阿里系主要是通过cobar/tddl来解决数据源路由问题。

所以,一般一个数据库实例上,会部署多个schema,每个schema会有由1个或者多个业务方关注

总结:parser和store为1:n,对于nebula而言,可以是一个storage的wal对应多个业务消费,即多套store,每个业务的过滤规则都不同。

2.2.2.2、数据n:1业务

同样,当一个业务的数据规模达到一定的量级后,必然会涉及到水平拆分和垂直拆分的问题,针对这些拆分的数据需要处理时,就需要链接多个store进行处理,消费的位点就会变成多份,而且数据消费的进度无法得到尽可能有序的保证。

所以,在一定业务场景下,需要将拆分后的增量数据进行归并处理,比如按照时间戳/全局id进行排序归并.

总结:如果数据库数据量非常大,经历了拆库拆表后,那么为了提高消费速度,可以由多个parser并行处理,但这样会导致数据乱序,所以后续再重新做归并排序。

2.2.3、EventStore设计

1、目前仅实现了Memory内存模式,后续计划增加本地file存储,mixed混合模式
2、借鉴了Disruptor的RingBuffer的实现思路

要注意的是,如果生成速度大于消费速度时,以上图为例,如果生产者想要写入 Ring Buffer 中序号 3 占据的节点,因为它是 Ring Buffer 当前游标的下一个节点。但是 ProducerBarrier 明白现在不能写入,因为有一个消费者正在占用它。所以,ProducerBarrier 停下来自旋 (spins),等待,直到那个消费者离开。

定义了3个cursor:
1)Put : Sink模块进行数据存储的最后一次写入位置
2)Get : 数据订阅获取的最后一次提取位置
3)Ack : 数据消费成功的最后一次消费位置

2.2.4、Instance设计


可以通过配置文件或者console这两种方式二选一在设置一个server启动几个instance。
instance代表了一个实际运行的数据队列,包括了EventPaser,EventSink,EventStore等组件。

2.2.5、Server设计


server代表了一个canal的运行实例,为了方便组件化使用,特意抽象了Embeded(嵌入式) / Netty(网络访问)的两种实现:
1)Embeded(嵌入式):对latency和可用性都有比较高的要求,自己又能hold住分布式的相关技术(比如failover)-- 例如,可以把Canal Server跟数据库部署在相同的服务器上,但当Canal Server的instance实例故障时,需要自己包装failover,如果可以一个instance对应一个套增量日志,那么当instance故障时,可以通过运维命令自动拉起。
2)Netty(RPC框架):基于netty封装了一层网络协议,由canal server保证其可用性,采用的pull模型,当然latency会稍微打点折扣,不过这个也视情况而定。(阿里系的notify和metaq,典型的push/pull模型,目前也逐步的在向pull模型靠拢,push在数据量大的时候会有一些问题)。

2.2.6、增量订阅设计


具体的协议格式,可参见:https://github.com/alibaba/canal/blob/master/protocol/src/main/java/com/alibaba/otter/canal/protocol/CanalProtocol.proto

get/ack/rollback协议介绍:
1)Message getWithoutAck(int batchSize),允许指定batchSize,一次可以获取多条,每次返回的对象为Message,包含的内容为:
a. batch id 唯一标识
b. entries 具体的数据对象,对应的数据对象格式:https://github.com/alibaba/canal/blob/master/protocol/src/main/java/com/alibaba/otter/canal/protocol/EntryProtocol.proto

2)void rollback(long batchId),顾命思议,回滚上次的get请求,重新获取数据。基于get获取的batchId进行提交,避免误操作

3)void ack(long batchId),顾命思议,确认已经消费成功,通知server删除数据。基于get获取的batchId进行提交,避免误操作

canal的get/ack/rollback协议和常规的jms协议有所不同,允许get/ack异步处理,比如可以连续调用get多次,后续异步按顺序提交ack/rollback,项目中称之为流式api.
流式api设计的好处:

  • get/ack异步化,减少因ack带来的网络延迟和操作成本 (99%的状态都是处于正常状态,异常的rollback属于个别情况,没必要为个别的case牺牲整个性能)
  • get获取数据后,业务消费存在瓶颈或者需要多进程/多线程消费时,可以不停的轮询get数据,不停的往后发送任务,提高并行化. (作者在实际业务中的一个case:业务数据消费需要跨中美网络,所以一次操作基本在200ms以上,为了减少延迟,所以需要实施并行化)
    流式API设计:
  • 每次get操作都会在meta中产生一个mark,mark标记会递增,保证运行过程中mark的唯一性
  • 每次的get操作,都会在上一次的mark操作记录的cursor继续往后取,如果mark不存在,则在last ack cursor继续往后取
  • 进行ack时,需要按照mark的顺序进行数序ack,不能跳跃ack. ack会删除当前的mark标记,并将对应的mark位置更新为last ack cusor
  • 一旦出现异常情况,客户端可发起rollback情况,重新置位:删除所有的mark, 清理get请求位置,下次请求会从last ack cursor继续往后取

2.2.7、数据对象格式

数据对象格式:https://github.com/alibaba/canal/blob/master/protocol/src/main/java/com/alibaba/otter/canal/protocol/EntryProtocol.proto

每次调用get, 访问master的增量日志,返回数据格式如下:

Entry
	Header
		logfileName [binlog文件名]
		logfileOffset [binlog position]
		executeTime [binlog里记录变更发生的时间戳]
		schemaName [数据库实例]
		tableName [表名]
		eventType [insert/update/delete类型]
	entryType 	[事务头BEGIN/事务尾END/数据ROWDATA]
	storeValue 	[byte数据,可展开,对应的类型为RowChange]
RowChange
isDdl		[是否是ddl变更操作,比如create table/drop table]
sql		[具体的ddl sql]
rowDatas	[具体insert/update/delete的变更数据,可为多条,1个binlog event事件可对应多条变更,比如批处理]
beforeColumns [Column类型的数组]
afterColumns [Column类型的数组]


Column
index		[column序号]
sqlType		[jdbc type]
name		[column name]
isKey		[是否为主键]
updated		[是否发生过变更]
isNull		[值是否为null]
value		[具体的内容,注意为文本]

说明:
1)可以提供数据库变更前和变更后的字段内容,针对binlog中没有的name,isKey等信息进行补全
2)可以提供ddl的变更语句

2.2.8、HA机制设计

canal的ha分为两部分,canal server和canal client分别有对应的ha实现

1)canal server: 为了减少对mysql dump的请求,不同server上的instance要求同一时间只能有一个处于running,其他的处于standby状态.
2)canal client: 为了保证有序性,一份instance同一时间只能由一个canal client进行get/ack/rollback操作,否则客户端接收无法保证有序。


1、canal server要启动某个canal instance时都先向zookeeper进行一次尝试启动判断 (实现:创建EPHEMERAL节点,谁创建成功就允许谁启动)
2、创建zookeeper节点成功后,对应的canal server就启动对应的canal instance,没有创建成功的canal instance就会处于standby状态
3、一旦zookeeper发现canal server A创建的节点消失后,立即通知其他的canal server再次进行步骤1的操作,重新选出一个canal server启动instance.
4、canal client每次进行connect时,会首先向zookeeper询问当前是谁启动了canal instance,然后和其建立链接,一旦链接不可用,会重新尝试connect.

Canal Client的方式和canal server方式类似,也是利用zookeeper的抢占EPHEMERAL节点的方式进行控制.

以上是关于再学Canal的主要内容,如果未能解决你的问题,请参考以下文章

canal+Kafka实现mysql与redis数据同步

canal+Kafka实现mysql与redis数据同步

canal+Kafka实现mysql与redis数据同步

canal+Kafka实现mysql与redis数据同步

canal +RocketMQ实现MySQL与ElasticSearch数据同步

canal