TIDB - 使用 TiDB Binlog 将日志同步至下游 Kafka 中
Posted 小毕超
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了TIDB - 使用 TiDB Binlog 将日志同步至下游 Kafka 中相关的知识,希望对你有一定的参考价值。
一、TiDB Binlog
在上篇文章中我们介绍了使用TiDB Binlog将数据同步至下游的mysql 中,本篇我们学习下使用TiDB Binlog工具将数据同步至Kafka中自定义业务逻辑,比如可以做TIDB和ES、MongoDB 或 Redis的数据同步,这功能就和Canal解析Mysql的binlog功能相差不大。如果还不了解TiDB Binlog工具的也可以参考我的上篇博客:
https://blog.csdn.net/qq_43692950/article/details/121597230
注意:在做实验前,请确保已经配置好Kafka环境:不了解的可以参考下面一篇我的博客:
消息中间件KafKa集群搭建与使用: https://blog.csdn.net/qq_43692950/article/details/110648852
二、TiDB Binlog 配置
在上篇文章中,我们使用tiup 扩容出了一个pump 和 一个 drainer,我们先看下现在的集群架构:
但上篇文章我们讲解的是TIDB 到 Mysql之间的同步,如果换成Kafka只需修改下配置文件即可,但考虑到有些小伙伴可能没有看过前面我们的系列教程,这里我们还是通过扩容的方式扩容出pump 和 drainer,如果已经安装过pump 和 drainer,直接修改配置即可:
tiup cluster edit-config tidb-test
drainer_servers:
- host: 192.168.40.162
ssh_port: 22
port: 8249
deploy_dir: /tidb-deploy/drainer-8249
data_dir: /tidb-data/drainer-8249
log_dir: /tidb-deploy/drainer-8249/log
config:
syncer.db-type: kafka
syncer.to.kafka-addrs: 192.168.40.1:9092
syncer.to.kafka-version: 2.6.0
syncer.to.topic-name: tidb-test
arch: amd64
os: linux
修改上述kafka的指向即可,如果是kafka集群,用英文逗号隔开即可。
下面我们再讲下扩容的方式,没有安装pump 和 drainer的就用看下面的方式:
编写扩容配置
vi scale-out-binlog.yaml
写入以下内容:
pump_servers:
- host: 192.168.40.160
ssh_port: 22
port: 8250
deploy_dir: /tidb-deploy/pump-8250
data_dir: /tidb-data/pump-8250
log_dir: /tidb-deploy/pump-8250/log
config:
gc: 7
storage.stop-write-at-available-space: 200MB
arch: amd64
os: linux
drainer_servers:
- host: 192.168.40.162
ssh_port: 22
port: 8249
deploy_dir: /tidb-deploy/drainer-8249
data_dir: /tidb-data/drainer-8249
log_dir: /tidb-deploy/drainer-8249/log
config:
syncer.db-type: kafka
syncer.to.kafka-addrs: 192.168.40.1:9092
syncer.to.kafka-version: 2.6.0
syncer.to.topic-name: tidb-test
arch: amd64
os: linux
注意 storage.stop-write-at-available-space 这个参数表示存储空间低于指定值时不再接收 binlog 写入请求,默认为10G ,如果硬盘没这么大,就调小一点。
开始扩容:
tiup cluster scale-out tidb-test scale-out-binlog.yaml -u root -p
等待一会就可以看到集群中已经有pump 和 drainer了:
下一步还要开启TIDB的binglog配制:
tiup cluster edit-config tidb-test
修改 server_configs 的配制:
server_configs:
tidb:
binlog.enable: true
binlog.ignore-error: true
重新加载集群:
tiup cluster reload tidb-test
使用mysql 客户端连接tidb,查看bnlog是否已经开启:
show variables like "log_bin";
ON即为开启状态。
看下pump和drainer的状态:
show pump status;
show drainer status;
状态都为online在线状态。
三、SpringBoot 消息监听客户端
下载官方demo
https://github.com/pingcap/tidb-tools/tree/master/tidb-binlog/driver/example/kafkaReader
官方demo是直接用的Java Kafka Api,本篇我们使用SpringBoot 的 spring-kafka 。
下载之后需要将三个文件复制到自己的SpringBoot项目中:
需要用这三个工具进行解析数据,不然解析出来的是乱码,这点可以去Tidb的社区看下:
POM文件引入的主要依赖:
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<!-- https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java -->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>3.9.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java-util -->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java-util</artifactId>
<version>3.9.1</version>
</dependency>
application配制信息:
server:
port: 8080
spring:
kafka:
# kafka服务器地址(可以多个)
# bootstrap-servers: 192.168.159.128:9092,192.168.159.129:9092,192.168.159.130:9092
bootstrap-servers: 192.168.40.1:9092
consumer:
# 指定一个默认的组名
group-id: kafkaGroup
# earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
# latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
# none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
auto-offset-reset: earliest
# key/value的反序列化
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
producer:
# key/value的序列化
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# 批量抓取
batch-size: 65536
# 缓存容量
buffer-memory: 524288
#失败重试次数
retries: 3
# 服务器地址
# bootstrap-servers: 192.168.159.128:9092,192.168.159.129:9092,192.168.159.130:9092
注意consumer.value-deserializer这个要使用ByteArrayDeserializer,主要发送端就是byte[],我们只能配合:
日志监听:
@Slf4j
@Component
public class TidbConsumer
@KafkaListener(topics = "tidb-test")
public void receive3(ConsumerRecord<String, byte[]> consumer) throws Exception
System.out.println("tidb bing-log Listener >> ");
//binglog对象
BinLogInfo.Binlog binlog = BinLogInfo.Binlog.parseFrom(consumer.value());
//操作类型 0 DML 1 DDL
BinLogInfo.BinlogType type = binlog.getType();
log.info(binlog.toString());
log.info("操作类型: ", type);
//解析内容
if (BinLogInfo.BinlogType.DML == type)
BinLogInfo.DMLData dmlData = binlog.getDmlData();
if (dmlData.getTablesCount() == 0)
return;
dmlData.getTablesList().forEach(table ->
String db = table.getSchemaName();
log.info("更新数据库:", db);
String tableName = table.getTableName();
log.info("更新数据表:", tableName);
List<BinLogInfo.ColumnInfo> columnInfoList = table.getColumnInfoList();
List<BinLogInfo.TableMutation> MutationsList = table.getMutationsList();
MutationsList.forEach(mutation ->
BinLogInfo.MutationType mutationType = mutation.getType();
log.info("操作类型:", mutationType);
List<BinLogInfo.Column> columnsList = mutation.getRow().getColumnsList();
//解析更新后的数据
for (int i = 0; i < columnInfoList.size(); i++)
String filedName = columnInfoList.get(i).getName();
log.info("字段: ,更新后的值: ", filedName, columnsList.get(i));
);
);
else if (BinLogInfo.BinlogType.DDL == type)
BinLogInfo.DDLData ddlData = binlog.getDdlData();
String db = ddlData.getSchemaName();
String tableName = ddlData.getTableName();
String ddlSql = ddlData.getDdlQuery().toStringUtf8();
log.info("更新数据库:",db);
log.info("更新数据表:", tableName);
log.info("DDL :", ddlSql);
else
throw new Exception("analysis binglog err!");
四、测试
测试表结构:
添加数据:
insert into user(name,age) values('bxc',10);
BinLogInfo.Binlog toString信息:
type: DML
commit_ts: 429572910085570562
dml_data
tables
schema_name: "testdb"
table_name: "user"
column_info
name: "id"
mysql_type: "int"
is_primary_key: true
column_info
name: "name"
mysql_type: "varchar"
is_primary_key: false
column_info
name: "age"
mysql_type: "int"
is_primary_key: false
mutations
type: Insert
row
columns
int64_value: 212247
columns
string_value: "bxc"
columns
int64_value: 10
5:
1: "PRIMARY"
2: "id"
解析打印信息:
更新数据:
update user set age = 20 where name = 'bxc';
删除数据:
delete from user where name = 'bxc';
创建一个新的表:
CREATE TABLE `user_copy` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`name` varchar(255) DEFAULT NULL,
`age` int(10) DEFAULT NULL,
PRIMARY KEY (`id`) /*T![clustered_index] CLUSTERED */
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin AUTO_INCREMENT=242214;
BinLogInfo.Binlog toString信息:
type: DDL
commit_ts: 429573182230102017
ddl_data
schema_name: "testdb"
table_name: "user_copy"
ddl_query: "CREATE TABLE `user_copy` ( `id` int(11) NOT NULL AUTO_INCREMENT, `name` varchar(255) DEFAULT NULL, `age` int(10) DEFAULT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin AUTO_INCREMENT=242214"
喜欢的小伙伴可以关注我的个人微信公众号,获取更多学习资料!
以上是关于TIDB - 使用 TiDB Binlog 将日志同步至下游 Kafka 中的主要内容,如果未能解决你的问题,请参考以下文章
TIDB - 使用 TICDC 将数据同步至下游 Mysql 中
Flink 最佳实践之使用 Canal 同步 MySQL 数据至 TiDB
Flink 最佳实践之使用 Canal 同步 MySQL 数据至 TiDB