TIDB - 使用 TICDC 将数据同步至下游 Kafka 中

Posted 小毕超

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了TIDB - 使用 TICDC 将数据同步至下游 Kafka 中相关的知识,希望对你有一定的参考价值。

一、TICDC

在上篇文章中,我们介绍了使用TICDC 将数据同步至 mysql 中,从上个任务就可以看出,TiCDC相比于Tidb binlog 在配制上就简化了很多,而且我们也知道TICDC的性能也是优于 tidb binlog的,今天我们学习下使用TiCDC怎么将数据同步至下游Kafka中,以实现TIDB 到 ES、MongoDB、Redis等 NoSql 数据库的同步。

上篇博客地址:

https://blog.csdn.net/qq_43692950/article/details/121731278

注意:使用TiCDC ,需将TIDB版本上级至 v4.0.6 以上。

二、TICDC 配制数据同步Kafka

本篇文章接着上篇文章继续讲解,先看下现在的集群状况:


还是上篇文章中我们扩容出的CDC-server。

在上篇文章中,我们已经创建了TIDB 到 mysql 数据同步的任务,现在我们再创建一个到Kafka的同步任务:

./cdc cli changefeed create --pd=http://192.168.40.160:2379 --sink-uri='kafka://192.168.40.1:9092/tidb-cdc?kafka-version=2.6.0&partition-num=1&max-message-bytes=67108864&replication-factor=1&protocol=canal-json' --changefeed-id="replication-task-2"

tidb-cdc:表示topic
kafka-version:下游 Kafka 版本号(可选,默认值 2.4.0,目前支持的最低版本为 0.11.0.2
kafka-client-id:指定同步任务的 Kafka 客户端的 ID(可选,默认值为 TiCDC_sarama_producer_同步任务的 ID
partition-num:下游 Kafka partition 数量(可选,不能大于实际 partition 数量。如果不填会自动获取 partition 数量。
protocol:表示输出到 kafka 消息协议,可选值有 default、canal、avro、maxwell、canal-json(默认值为 default
max-message-bytes:每次向 Kafka broker 发送消息的最大数据量(可选,默认值 64MB
replication-factor:kafka 消息保存副本数(可选,默认值 1
ca:连接下游 Kafka 实例所需的 CA 证书文件路径(可选)
cert:连接下游 Kafka 实例所需的证书文件路径(可选)
key:连接下游 Kafka 实例所需的证书密钥文件路径(可选)


已经创建成功。

使用下面命令就可以看到,所有的任务:

./cdc cli changefeed list --pd=http://192.168.40.160:2379

或者查看我们任务的详细情况:

./cdc cli changefeed query --pd=http://192.168.40.160:2379 --changefeed-id=replication-task-2

三、SpringBoot Kafka监听

添加POM依赖

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

application


server:
  port: 8081

spring:
  kafka:
    # kafka服务器地址(可以多个)
#    bootstrap-servers: 192.168.159.128:9092,192.168.159.129:9092,192.168.159.130:9092
    bootstrap-servers: 192.168.40.1:9092
    consumer:
      # 指定一个默认的组名
      group-id: kafkaGroup
      # earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
      # latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
      # none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
      auto-offset-reset: earliest

      # key/value的反序列化
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
#      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
    producer:
      # key/value的序列化
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      # 批量抓取
      batch-size: 65536
      # 缓存容量
      buffer-memory: 524288
      #失败重试次数
      retries: 3
      # 服务器地址
#      bootstrap-servers: 192.168.159.128:9092,192.168.159.129:9092,192.168.159.130:9092

消费者监听事件

@Slf4j
@Component
public class Jms_Consumer 

    @KafkaListener(topics = "tidb-cdc")
    public void receive4(ConsumerRecord<?, ?> consumer) throws Exception 
        System.out.println("tidb tidb-cdc  Listener >> ");
        JSONObject jsonObject = JSONObject.parseObject(new String(consumer.value()));
        String type = jsonObject.getString("type");
        String db = jsonObject.getString("database");
        String table = jsonObject.getString("table");
        String data = jsonObject.getString("data");
        log.info("操作类型:",type);
        log.info("数据库:",db);
        log.info("数据表:",table);
        log.info("更新后数据:",data);
    



四、测试数据同步

向TIDB中插入数据:

insert into user(name,age) value('bxc','25');

kafka接受JSON


	"id": 0,
	"database": "testdb",
	"table": "user",
	"pkNames": ["id"],
	"isDdl": false,
	"type": "INSERT",
	"es": 1638698748819,
	"ts": 0,
	"sql": "",
	"sqlType": 
		"age": -5,
		"id": -5,
		"name": 12
	,
	"mysqlType": 
		"age": "int",
		"id": "int",
		"name": "varchar"
	,
	"data": [
		"age": "25",
		"id": "242219",
		"name": "bxc"
	],
	"old": [null]


更新数据:

update user set age=24 where name = 'bxc';

Kafka接受JSON


	"id": 0,
	"database": "testdb",
	"table": "user",
	"pkNames": ["id"],
	"isDdl": false,
	"type": "UPDATE",
	"es": 1638699660093,
	"ts": 0,
	"sql": "",
	"sqlType": 
		"age": -5,
		"id": -5,
		"name": 12
	,
	"mysqlType": 
		"age": "int",
		"id": "int",
		"name": "varchar"
	,
	"data": [
		"age": "24",
		"id": "242216",
		"name": "bxc"
	],
	"old": [
		"age": "23",
		"id": "242216",
		"name": "bxc"
	]


删除数据:

delete from user where name = 'bxc';

Kafka接受JSON


	"id": 0,
	"database": "testdb",
	"table": "user",
	"pkNames": ["id"],
	"isDdl": false,
	"type": "DELETE",
	"es": 1638699773943,
	"ts": 0,
	"sql": "",
	"sqlType": 
		"age": -5,
		"id": -5,
		"name": 12
	,
	"mysqlType": 
		"age": "int",
		"id": "int",
		"name": "varchar"
	,
	"data": [
		"age": "25",
		"id": "242218",
		"name": "bxc"
	],
	"old": [
		"age": "25",
		"id": "242218",
		"name": "bxc"
	]

五、扩展

停止同步任务:

./cdc cli changefeed pause --pd=http://192.168.40.160:2379 --changefeed-id replication-task-2

删除同步任务

./cdc cli changefeed remove --pd=http://192.168.40.160:2379 --changefeed-id replication-task-2


喜欢的小伙伴可以关注我的个人微信公众号,获取更多学习资料!

以上是关于TIDB - 使用 TICDC 将数据同步至下游 Kafka 中的主要内容,如果未能解决你的问题,请参考以下文章

TIDB - 使用 TICDC 将数据同步至下游 Mysql 中

TiDB Online DDL 在 TiCDC 中的应用丨TiDB 工具分享

TIDB - 使用 TiDB Binlog 将日志同步至下游 Kafka 中

TiCDC 源码阅读TiCDC 集群工作过程解析

TiCDC 源码阅读TiCDC 集群工作过程解析

TiCDC 源码阅读TiKV CDC 模块介绍