flinksql从kafka中消费mysql的binlog日志
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flinksql从kafka中消费mysql的binlog日志相关的知识,希望对你有一定的参考价值。
参考技术A
*使用canal采集mysql的binlog,输出到kafka,然后使用flinksql消费kafka,并输出到屏幕
1、vim /home/jaming/docker_work/mysql/conf/my.cnf
2、docker查看mysql的binlog开启是否成功
版本:canal.deployer-1.1.5-SNAPSHOT.tar.gz
1、vim /home/jaming/software/canal/conf/example/instance.properties
2、vim /home/jaming/software/canal/conf/canal.properties
略。。。。
kafka 消费?
- kafka消费中的问题及解决方法:
- 情况1:
- 问题:脚本读取kafka 数据,写入到数据库,有时候出现MySQL server has gone away,导致脚本死掉。再次启动,这过程中的kafka数据丢失。
- 原因:MySQL server has gone away 出现可能是连接超时,可能超过每秒请求上限…这些异常是小概率事件,难以避免。git kafka 的demo脚本是实时监听的脚本, 简单明了,没有再去针对kafka偏移量研究;但是一旦断掉, 过程中的kafka数据即丢失。
- 解决思路MySQL server has gone away无法避免,try catch 即可,异常消息存入日志中。通过数据补偿脚本把日志中的数据存入数据库,做到正常 异常的数据最终都能录入数据库。且脚本也不会异常死掉。脚本其他原因死掉(如服务器宕机),通过集群控制风险。
- 情况2:
- 问题:kafka消费时做数据检验,数据处理。代码看的很混乱, 且一旦业务调整或者有bug意味着要停止消费后重启。风险高,维护困难。
- 解决思路:解藕。kafka消费脚本只负责把数据写入数据库,标识初始状态。业务处理脚本集中处理数据。
以上是关于flinksql从kafka中消费mysql的binlog日志的主要内容,如果未能解决你的问题,请参考以下文章
Flink系列之:Debezium采集Mysql数据库表数据到Kafka Topic,同步kafka topic数据到StarRocks数据库
Spark从Kafka获取数据写入MySQL的实现(流式数据)
Spark从Kafka获取数据写入MySQL的实现(流式数据)
Demo:基于 Flink SQL 构建流式应用
kafka 消费?
flink sql client 连接kafka解析avro数据 (avro ArrayIndexOutOfBoundsException 解决办法)