实战 | MySQL Binlog通过Canal同步HDFS
Posted 大数据技术与架构
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了实战 | MySQL Binlog通过Canal同步HDFS相关的知识,希望对你有一定的参考价值。
-rwxr-xr-x 1 dc user 2645 Jul 18 14:25 canal.properties
-rwxr-xr-x 1 dc user 2521 Jul 17 18:31 canal.properties.bak
-rwxr-xr-x 1 dc user 3045 Jul 17 18:31 logback.xml
drwxr-xr-x 2 dc user 4096 Jul 17 18:38 spring
drwxr-xr-x 2 dc user 4096 Jul 19 11:55 trans1
## mysql serverId 部署ha的话,slaveId不能重复
canal.instance.mysql.slaveId = 1235
canal.instance.master.address = 10.172.152.66:3306
# username/password
canal.instance.dbUsername = root
canal.instance.dbPassword = root
# 采集表的正则
canal.instance.filter.regex = .*\\..*
canal server HA部署
修改配置文件,配置文件是conf目录下大canal.properties
# common argument
canal.id= 1 # 另一台 canal.id= 2
canal.ip= 10.172.152.66 # 另一台 canal.ip= 10.172.152.124
canal.port= 11111
canal.zkServers= 10.172.152.66:2181,10.172.152.124:2181,10.172.152.125:2181
# conf目录下mysql实例的文件名
canal.destinations= trans1
# 使用ha必须使用default-instance.xml
canal.instance.global.spring.xml = classpath:spring/default-instance.xml
下面看下client的设计,由于canal并没有继承client只是提供了一套client的api由用户自己去实现,则这里重点记录下client的设计。
canal client 功能设计
client消费原理
ack机制采用异步确认,也就是可以连续调用get多次,后续异步按顺序提交ack/rollback,这种机制在canal中称为流式 api设计。
get/ack异步化,减少因ack带来的网络延迟和操作成本 (99%的状态都是处于正常状态,异常的rollback属于个别情况,没必要为个别的case牺牲整个性能)
get获取数据后,业务消费存在瓶颈或者需要多进程/多线程消费时,可以不停的轮询get数据,不停的往后发送任务,提高并行化。
数据格式及使用
json格式如下:
{
"database":"test",
"table":"e",
"type":"delete/insert/update",
"executeTime":"1501645930000",
"consumerTime":"1501645930000",
"data":[
{"id":"1","num":"1"},
{"id":"2","num":"2"}
]
}
// DDL-json
{
"database":"test",
"table":"e",
"type":"create/alter/drop",
"executeTime":"1501645930000",
"consumerTime":"1501645930000",
"isDDL":"true/false",
"sql":"create table e(id int)"
}
解析DML-json的时候需要给数据新增一列来标识数据的状态,数据的状态是指数据是否被删除。
配置
DESTINATION = # 要消费的mysql实例
FILTER_REGEX = # 订阅的表信息的正则
BATCH_SIZE = # 批量获取的条数
DML_PATH =
DDL_PATH =
DELAY_TIME = # 延迟报警的阈值,单位ms
多线程异步处理数据
ackQueue
,get不断的获取数据,将message交给新线程处理并将
batchId
放入ackQueue中,待新线程处理完message之后进行ack确认,从ackQueue中取出batchId按顺序确认,如遇到异常进行回滚。
try{
while(true){
Message message = connector.getWithoutAck(BATCH_SIZE); // 获取指定数量的数据
batchId = message.getId();
ackQueue.put(batchId);
executorService.submit(new Runnable() {
public void run() {
// 需要一个线程和文件的映射,防止多个线程写同一个文件
parseMessage(message);
// 判断batchId是否和ackQueue中取得的batchId一致,大于则等待小于报异常
ackMessage(batchId);
}
});
}
}catch (Exception e){
connector.rollback(batchId);
}finally {
...
}
数据归档
这样既可以提高写的效率又可以减少对hdfs的操作,并且在上传hdfs时可以对数据进行合并,从源头上减少小文件的生成。
数据文件切分可以按照持有一个文件句柄的时间来进行切分并且到零点统一关闭所有句柄。
使用binlog中的executeTime进行文件切分,保证数据归档的时间准备性。
跨网络传输
北京的数据向杭州传输采用两种方案:
client将数据写入本地,然后通过rsync传输到杭州服务器
client调用Avro rpc将数据写入杭州的flume agent的Avro source中,通过fileChannel将数据写入杭州服务器。
方案1使用rsync进行数据传输,简单方便只开一次接口权限。而且client和杭州的client一致,不需要额外的开发。
由于一个mysql实例对应一个client,则会需要多个port进行数据传输。
建议rsync同步
监控报警
也可以判断一个时间窗口中两个时间点差值进行是否消费滞后的判断。
如何高效学习分布式算法?
如何高效学习分布式算法?
如果你留心观察,会发现有不少人看了很多资料和书籍,涉及到具体问题,照样一头雾水,比如:
• 拜占庭错误是怎么回事?为什么区块链用拜占庭容错算法?Paxos 算法不行吗?能黑比特币吗?
• 想要实现数据副本的一致性,到底该选 Paxos 算法,还是 Raft 算法?
• 为什么我的集群接入性能低?稍微出现峰值流量,为什么业务就基本不可用了?
• 如何设计分布式系统架构呢?那么多算法,Paxos、Raft、Gossip、Nuorum NWR、PBFT 等等,究竟该选择哪个?
其实,算法相对抽象,即使是非常经典的论文,也有一些关键细节没有交代清楚。而网上的信息大多是“复制粘贴”的结果,甚至有不少错误,给自主学习带来了很多障碍和误导。
在我看来,要掌握这部分内容,不仅要理解常用算法的原理、特点和局限性,还要根据场景特点选择适合的分布式算法。
刚好,极客时间上线了一个新专栏《分布式协议与算法实战》,作者是腾讯资深工程师韩健,我有幸提前看到了一部分内容,很想推荐给你。
在专栏中,他分享了自己支撑海量互联网服务中的分布式算法实战心得,让你学完就能在工作中根据场景特点,灵活地设计架构和运用分布式算法,开发出适合该场景的分布式系统,对架构设计的理解,也会更上一层楼。
以上是关于实战 | MySQL Binlog通过Canal同步HDFS的主要内容,如果未能解决你的问题,请参考以下文章 docker环境安装mysqlcanalelasticsearch,基于binlog利用canal实现mysql的数据同步到elasticsearch中 flinksql从kafka中消费mysql的binlog日志