实战 | MySQL Binlog通过Canal同步HDFS

Posted 大数据技术与架构

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了实战 | MySQL Binlog通过Canal同步HDFS相关的知识,希望对你有一定的参考价值。




点击右侧关注,大数据开发领域最强公众号!

点击右侧关注,暴走大数据!

之前 介绍性的文章简单介绍了实时同步mysql到hdfs的几种方案,本篇主要记录下利用canal同步mysql到hdfs的具体方案。
本文来自:http://bigdatadecode.club/MysqlToHDFSWithCanal.html

canal server 部署
在canal中一个mysql实例对应一个配置文件,配置文件放在conf目录下的一个文件夹中,该文件夹的名字就代表了mysql实例。结构如下
 
   
   
 
-rwxr-xr-x 1 dc user 2645 Jul 18 14:25 canal.properties
-rwxr-xr-x 1 dc user 2521 Jul 17 18:31 canal.properties.bak
-rwxr-xr-x 1 dc user 3045 Jul 17 18:31 logback.xml
drwxr-xr-x 2 dc user 4096 Jul 17 18:38 spring
drwxr-xr-x 2 dc user 4096 Jul 19 11:55 trans1
trans1代表一个mysql实例,该文件夹中有个instance.properties文件,在里面配置mysql数据库的信息。
 
   
   
 
## mysql serverId 部署ha的话,slaveId不能重复
canal.instance.mysql.slaveId = 1235
canal.instance.master.address = 10.172.152.66:3306
# username/password
canal.instance.dbUsername = root
canal.instance.dbPassword = root
# 采集表的正则
canal.instance.filter.regex = .*\\..*

canal server HA部署

采用canal的HA模式,canal的HA是依赖zk来实现的。
修改配置文件,配置文件是conf目录下大canal.properties
 
   
   
 
# common argument
canal.id= 1 # 另一台 canal.id= 2
canal.ip= 10.172.152.66 # 另一台 canal.ip= 10.172.152.124
canal.port= 11111
canal.zkServers= 10.172.152.66:2181,10.172.152.124:2181,10.172.152.125:2181
# conf目录下mysql实例的文件名
canal.destinations= trans1
# 使用ha必须使用default-instance.xml
canal.instance.global.spring.xml = classpath:spring/default-instance.xml
canal的部署比较简单,在instance.properties也设置从哪个binlog某个位置处开始读(未测)。
下面看下client的设计,由于canal并没有继承client只是提供了一套client的api由用户自己去实现,则这里重点记录下client的设计。

canal client 功能设计

client的主要功能是与canal server的某个destinations建立连接消费订阅的binlog信息,并将binlog进行解析落地到存储系统中。

client消费原理

client api中有ack和rollback机制,保证了数据不丢失。
ack机制采用异步确认,也就是可以连续调用get多次,后续异步按顺序提交ack/rollback,这种机制在canal中称为流式 api设计。
流式api设计的好处:
  • get/ack异步化,减少因ack带来的网络延迟和操作成本 (99%的状态都是处于正常状态,异常的rollback属于个别情况,没必要为个别的case牺牲整个性能)

  • get获取数据后,业务消费存在瓶颈或者需要多进程/多线程消费时,可以不停的轮询get数据,不停的往后发送任务,提高并行化。

数据格式及使用

client将binlog中的信息解析成json,json格式分为两种,一种是DML,主要包括insert、update和delete,另一种是DDL,主要包括create、alter和drop。
json格式如下:
// DML-json
{
  "database":"test",
  "table":"e",
  "type":"delete/insert/update",
  "executeTime":"1501645930000",
  "consumerTime":"1501645930000",
  "data":[
  {"id":"1","num":"1"},
  {"id":"2","num":"2"}
  ]
}
// DDL-json
{
  "database":"test",
  "table":"e",
  "type":"create/alter/drop",
  "executeTime":"1501645930000",
  "consumerTime":"1501645930000",
  "isDDL":"true/false",
  "sql":"create table e(id int)"
}

之所以分为两个json格式,是因为想把 DML-json里的数据直接格式化作为当天的增量使用,而DDL-json则需要解析成hql,个人建议在执行时需要人工check,DDL-json每天应该不会太多,初期人工check的压力不大,等程序成熟之后可以不用人工check。
解析DML-json的时候需要给数据新增一列来标识数据的状态,数据的状态是指数据是否被删除。

配置

方便程序的移植性将一些参数提出,作为配置文件,由程序动态周期性的加载。
ZK_HOSTS = # canal server ha的zk地址
DESTINATION =  # 要消费的mysql实例
FILTER_REGEX =  # 订阅的表信息的正则
BATCH_SIZE =  # 批量获取的条数
DML_PATH =
DDL_PATH =
DELAY_TIME =  # 延迟报警的阈值,单位ms

多线程异步处理数据

client api提供get/ack/rollback接口,为了提高消费效率,采用异步ack的机制。
设计一个一定大小的 ackQueue,get不断的获取数据,将message交给新线程处理并将 batchId放入ackQueue中,待新线程处理完message之后进行ack确认,从ackQueue中取出batchId按顺序确认,如遇到异常进行回滚。
伪代码如下:
 
   
   
 
try{
while(true){
Message message = connector.getWithoutAck(BATCH_SIZE); // 获取指定数量的数据
batchId = message.getId();
ackQueue.put(batchId);
executorService.submit(new Runnable() {
public void run() {
// 需要一个线程和文件的映射,防止多个线程写同一个文件
parseMessage(message);
// 判断batchId是否和ackQueue中取得的batchId一致,大于则等待小于报异常
ackMessage(batchId);
}
});
}
}catch (Exception e){
connector.rollback(batchId);
}finally {
...
}

数据归档

由于目前的使用场景对实时要求不高,而是离线使用,则建议将数据直接写入本地文件系统,然后批量上传至HDFS。
这样既可以提高写的效率又可以减少对hdfs的操作,并且在上传hdfs时可以对数据进行合并,从源头上减少小文件的生成。
数据归档方案:
  • 数据文件切分可以按照持有一个文件句柄的时间来进行切分并且到零点统一关闭所有句柄。

  • 使用binlog中的executeTime进行文件切分,保证数据归档的时间准备性。

跨网络传输

  • 北京的数据向杭州传输采用两种方案:

  • client将数据写入本地,然后通过rsync传输到杭州服务器

  • client调用Avro rpc将数据写入杭州的flume agent的Avro source中,通过fileChannel将数据写入杭州服务器。


对比:
方案1使用rsync进行数据传输,简单方便只开一次接口权限。而且client和杭州的client一致,不需要额外的开发。
方案2由于使用rpc传输则每个client对应一个port,需要多次申请port权限,同样在杭州的机器上也需要开通port权限,这样暴露的port较多,而且需要运维开通权限。
由于一个mysql实例对应一个client,则会需要多个port进行数据传输。
建议rsync同步 

监控报警

监控主要是监控消费延迟,判断消费延迟的依据是处理当前message的时间和该message在binlog中的executeTime的差值,大于设置的阈值则认为消费滞后,进行报警。
也可以判断一个时间窗口中两个时间点差值进行是否消费滞后的判断。
-- MORE | 更多精彩文章 --


如何高效学习分布式算法?


如果你留心观察,会发现有不少人看了很多资料和书籍,涉及到具体问题,照样一头雾水,比如:


•  拜占庭错误是怎么回事?为什么区块链用拜占庭容错算法?Paxos 算法不行吗?能黑比特币吗?

•  想要实现数据副本的一致性,到底该选 Paxos 算法,还是 Raft 算法?

•  为什么我的集群接入性能低?稍微出现峰值流量,为什么业务就基本不可用了?

•  如何设计分布式系统架构呢?那么多算法,Paxos、Raft、Gossip、Nuorum NWR、PBFT 等等,究竟该选择哪个?

 

其实,算法相对抽象,即使是非常经典的论文,也有一些关键细节没有交代清楚。而网上的信息大多是“复制粘贴”的结果,甚至有不少错误,给自主学习带来了很多障碍和误导。

 

在我看来,要掌握这部分内容,不仅要理解常用算法的原理、特点和局限性,还要根据场景特点选择适合的分布式算法

 

刚好,极客时间上线了一个新专栏《分布式协议与算法实战》,作者是腾讯资深工程师韩健,我有幸提前看到了一部分内容,很想推荐给你。

 

在专栏中,他分享了自己支撑海量互联网服务中的分布式算法实战心得,让你学完就能在工作中根据场景特点,灵活地设计架构和运用分布式算法,开发出适合该场景的分布式系统,对架构设计的理解,也会更上一层楼。

以上是关于实战 | MySQL Binlog通过Canal同步HDFS的主要内容,如果未能解决你的问题,请参考以下文章

docker环境安装mysqlcanalelasticsearch,基于binlog利用canal实现mysql的数据同步到elasticsearch中

flinksql从kafka中消费mysql的binlog日志

利用Canal解析mysql binlog日志

监听MySQL的binlog日志工具分析:Canal

canal+kafka订阅Mysql binlog将数据异构到elasticsearch(或其他存储方式)

springboot整合canal,监听MySQL binlog日志,实现增量同步