TIDB - 使用 TiDB Binlog 将日志同步至下游 Kafka 中

Posted 小毕超

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了TIDB - 使用 TiDB Binlog 将日志同步至下游 Kafka 中相关的知识,希望对你有一定的参考价值。

一、TiDB Binlog

在上篇文章中我们介绍了使用TiDB Binlog将数据同步至下游的mysql 中,本篇我们学习下使用TiDB Binlog工具将数据同步至Kafka中自定义业务逻辑,比如可以做TIDB和ES、MongoDB 或 Redis的数据同步,这功能就和Canal解析Mysql的binlog功能相差不大。如果还不了解TiDB Binlog工具的也可以参考我的上篇博客:

https://blog.csdn.net/qq_43692950/article/details/121597230

注意:在做实验前,请确保已经配置好Kafka环境:不了解的可以参考下面一篇我的博客:

消息中间件KafKa集群搭建与使用: https://blog.csdn.net/qq_43692950/article/details/110648852

二、TiDB Binlog 配置

在上篇文章中,我们使用tiup 扩容出了一个pump 和 一个 drainer,我们先看下现在的集群架构:


但上篇文章我们讲解的是TIDB 到 Mysql之间的同步,如果换成Kafka只需修改下配置文件即可,但考虑到有些小伙伴可能没有看过前面我们的系列教程,这里我们还是通过扩容的方式扩容出pump 和 drainer,如果已经安装过pump 和 drainer,直接修改配置即可:

tiup cluster edit-config tidb-test
drainer_servers:
- host: 192.168.40.162
  ssh_port: 22
  port: 8249
  deploy_dir: /tidb-deploy/drainer-8249
  data_dir: /tidb-data/drainer-8249
  log_dir: /tidb-deploy/drainer-8249/log
  config:
    syncer.db-type: kafka
    syncer.to.kafka-addrs: 192.168.40.1:9092
    syncer.to.kafka-version: 2.6.0
    syncer.to.topic-name: tidb-test
  arch: amd64
  os: linux


修改上述kafka的指向即可,如果是kafka集群,用英文逗号隔开即可。

下面我们再讲下扩容的方式,没有安装pump 和 drainer的就用看下面的方式:

编写扩容配置

vi scale-out-binlog.yaml

写入以下内容:

pump_servers:
- host: 192.168.40.160
  ssh_port: 22
  port: 8250
  deploy_dir: /tidb-deploy/pump-8250
  data_dir: /tidb-data/pump-8250
  log_dir: /tidb-deploy/pump-8250/log
  config:
    gc: 7
    storage.stop-write-at-available-space: 200MB
  arch: amd64
  os: linux
drainer_servers:
- host: 192.168.40.162
  ssh_port: 22
  port: 8249
  deploy_dir: /tidb-deploy/drainer-8249
  data_dir: /tidb-data/drainer-8249
  log_dir: /tidb-deploy/drainer-8249/log
  config:
    syncer.db-type: kafka
    syncer.to.kafka-addrs: 192.168.40.1:9092
    syncer.to.kafka-version: 2.6.0
    syncer.to.topic-name: tidb-test
  arch: amd64
  os: linux

注意 storage.stop-write-at-available-space 这个参数表示存储空间低于指定值时不再接收 binlog 写入请求,默认为10G ,如果硬盘没这么大,就调小一点。

开始扩容:

tiup cluster scale-out tidb-test scale-out-binlog.yaml -u root -p

等待一会就可以看到集群中已经有pump 和 drainer了:


下一步还要开启TIDB的binglog配制:

tiup cluster edit-config tidb-test

修改 server_configs 的配制:

server_configs:
  tidb:
    binlog.enable: true
    binlog.ignore-error: true


重新加载集群:

tiup cluster reload tidb-test

使用mysql 客户端连接tidb,查看bnlog是否已经开启:

show variables like "log_bin";


ON即为开启状态。

看下pump和drainer的状态:

show pump status;

show drainer status;


状态都为online在线状态。

三、SpringBoot 消息监听客户端

下载官方demo

https://github.com/pingcap/tidb-tools/tree/master/tidb-binlog/driver/example/kafkaReader

官方demo是直接用的Java Kafka Api,本篇我们使用SpringBoot 的 spring-kafka 。

下载之后需要将三个文件复制到自己的SpringBoot项目中:

需要用这三个工具进行解析数据,不然解析出来的是乱码,这点可以去Tidb的社区看下:

POM文件引入的主要依赖:

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

<!-- https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java -->
<dependency>
    <groupId>com.google.protobuf</groupId>
    <artifactId>protobuf-java</artifactId>
    <version>3.9.0</version>
</dependency>

<!-- https://mvnrepository.com/artifact/com.google.protobuf/protobuf-java-util -->
<dependency>
    <groupId>com.google.protobuf</groupId>
    <artifactId>protobuf-java-util</artifactId>
    <version>3.9.1</version>
</dependency>

application配制信息:


server:
  port: 8080

spring:
  kafka:
    # kafka服务器地址(可以多个)
#    bootstrap-servers: 192.168.159.128:9092,192.168.159.129:9092,192.168.159.130:9092
    bootstrap-servers: 192.168.40.1:9092
    consumer:
      # 指定一个默认的组名
      group-id: kafkaGroup
      # earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
      # latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
      # none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
      auto-offset-reset: earliest

      # key/value的反序列化
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
    producer:
      # key/value的序列化
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 批量抓取
      batch-size: 65536
      # 缓存容量
      buffer-memory: 524288
      #失败重试次数
      retries: 3
      # 服务器地址
#      bootstrap-servers: 192.168.159.128:9092,192.168.159.129:9092,192.168.159.130:9092

注意consumer.value-deserializer这个要使用ByteArrayDeserializer,主要发送端就是byte[],我们只能配合:

日志监听:

@Slf4j
@Component
public class TidbConsumer 

    @KafkaListener(topics = "tidb-test")
    public void receive3(ConsumerRecord<String, byte[]> consumer) throws Exception 
        System.out.println("tidb bing-log  Listener >> ");
        //binglog对象
        BinLogInfo.Binlog binlog = BinLogInfo.Binlog.parseFrom(consumer.value());
        //操作类型  0 DML 1 DDL
        BinLogInfo.BinlogType type = binlog.getType();
        log.info(binlog.toString());
        log.info("操作类型: ", type);
        //解析内容
        if (BinLogInfo.BinlogType.DML == type) 
            BinLogInfo.DMLData dmlData = binlog.getDmlData();
            if (dmlData.getTablesCount() == 0) 
                return;
            
            dmlData.getTablesList().forEach(table -> 
                String db = table.getSchemaName();
                log.info("更新数据库:", db);
                String tableName = table.getTableName();
                log.info("更新数据表:", tableName);
                List<BinLogInfo.ColumnInfo> columnInfoList = table.getColumnInfoList();
                List<BinLogInfo.TableMutation> MutationsList = table.getMutationsList();
                MutationsList.forEach(mutation -> 
                    BinLogInfo.MutationType mutationType = mutation.getType();
                    log.info("操作类型:", mutationType);
                    List<BinLogInfo.Column> columnsList = mutation.getRow().getColumnsList();
                    //解析更新后的数据
                    for (int i = 0; i < columnInfoList.size(); i++) 
                        String filedName = columnInfoList.get(i).getName();
                        log.info("字段: ,更新后的值: ", filedName, columnsList.get(i));
                    
                );
            );
         else if (BinLogInfo.BinlogType.DDL == type) 
            BinLogInfo.DDLData ddlData = binlog.getDdlData();
            String db  = ddlData.getSchemaName();
            String tableName = ddlData.getTableName();
            String ddlSql = ddlData.getDdlQuery().toStringUtf8();
            log.info("更新数据库:",db);
            log.info("更新数据表:", tableName);
            log.info("DDL :", ddlSql);
         else 
            throw new Exception("analysis binglog err!");
        
    

四、测试

测试表结构:

添加数据:

insert into user(name,age) values('bxc',10);

BinLogInfo.Binlog toString信息:

type: DML
commit_ts: 429572910085570562
dml_data 
  tables 
    schema_name: "testdb"
    table_name: "user"
    column_info 
      name: "id"
      mysql_type: "int"
      is_primary_key: true
    
    column_info 
      name: "name"
      mysql_type: "varchar"
      is_primary_key: false
    
    column_info 
      name: "age"
      mysql_type: "int"
      is_primary_key: false
    
    mutations 
      type: Insert
      row 
        columns 
          int64_value: 212247
        
        columns 
          string_value: "bxc"
        
        columns 
          int64_value: 10
        
      
    
    5: 
      1: "PRIMARY"
      2: "id"
    
  

解析打印信息:

更新数据:

update user set age = 20 where name = 'bxc';


删除数据:

delete from user where name = 'bxc';


创建一个新的表:

CREATE TABLE `user_copy` (
  `id` int(11) NOT NULL AUTO_INCREMENT,
  `name` varchar(255) DEFAULT NULL,
  `age` int(10) DEFAULT NULL,
  PRIMARY KEY (`id`) /*T![clustered_index] CLUSTERED */
) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin AUTO_INCREMENT=242214;

BinLogInfo.Binlog toString信息:

type: DDL
commit_ts: 429573182230102017
ddl_data 
  schema_name: "testdb"
  table_name: "user_copy"
  ddl_query: "CREATE TABLE `user_copy` (  `id` int(11) NOT NULL AUTO_INCREMENT,  `name` varchar(255) DEFAULT NULL,  `age` int(10) DEFAULT NULL,  PRIMARY KEY (`id`)  ) ENGINE=InnoDB DEFAULT CHARSET=utf8 COLLATE=utf8_bin AUTO_INCREMENT=242214"



喜欢的小伙伴可以关注我的个人微信公众号,获取更多学习资料!

以上是关于TIDB - 使用 TiDB Binlog 将日志同步至下游 Kafka 中的主要内容,如果未能解决你的问题,请参考以下文章

TIDB - 使用 TICDC 将数据同步至下游 Mysql 中

TiDB Binlog工具Drainer使用

Flink 最佳实践之使用 Canal 同步 MySQL 数据至 TiDB

Flink 最佳实践之使用 Canal 同步 MySQL 数据至 TiDB

Flink 最佳实践之使用 Canal 同步 MySQL 数据至 TiDB

Flink 最佳实践之使用 Canal 同步 MySQL 数据至 TiDB