TIDB - 使用 TICDC 将数据同步至下游 Kafka 中
Posted 小毕超
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了TIDB - 使用 TICDC 将数据同步至下游 Kafka 中相关的知识,希望对你有一定的参考价值。
一、TICDC
在上篇文章中,我们介绍了使用TICDC 将数据同步至 mysql 中,从上个任务就可以看出,TiCDC相比于Tidb binlog 在配制上就简化了很多,而且我们也知道TICDC的性能也是优于 tidb binlog的,今天我们学习下使用TiCDC怎么将数据同步至下游Kafka中,以实现TIDB 到 ES、MongoDB、Redis等 NoSql 数据库的同步。
上篇博客地址:
https://blog.csdn.net/qq_43692950/article/details/121731278
注意:使用TiCDC ,需将TIDB版本上级至 v4.0.6 以上。
二、TICDC 配制数据同步Kafka
本篇文章接着上篇文章继续讲解,先看下现在的集群状况:
还是上篇文章中我们扩容出的CDC-server。
在上篇文章中,我们已经创建了TIDB 到 mysql 数据同步的任务,现在我们再创建一个到Kafka的同步任务:
./cdc cli changefeed create --pd=http://192.168.40.160:2379 --sink-uri='kafka://192.168.40.1:9092/tidb-cdc?kafka-version=2.6.0&partition-num=1&max-message-bytes=67108864&replication-factor=1&protocol=canal-json' --changefeed-id="replication-task-2"
tidb-cdc:表示topic
kafka-version:下游 Kafka 版本号(可选,默认值 2.4.0,目前支持的最低版本为 0.11.0.2
kafka-client-id:指定同步任务的 Kafka 客户端的 ID(可选,默认值为 TiCDC_sarama_producer_同步任务的 ID
partition-num:下游 Kafka partition 数量(可选,不能大于实际 partition 数量。如果不填会自动获取 partition 数量。
protocol:表示输出到 kafka 消息协议,可选值有 default、canal、avro、maxwell、canal-json(默认值为 default
max-message-bytes:每次向 Kafka broker 发送消息的最大数据量(可选,默认值 64MB
replication-factor:kafka 消息保存副本数(可选,默认值 1
ca:连接下游 Kafka 实例所需的 CA 证书文件路径(可选)
cert:连接下游 Kafka 实例所需的证书文件路径(可选)
key:连接下游 Kafka 实例所需的证书密钥文件路径(可选)
已经创建成功。
使用下面命令就可以看到,所有的任务:
./cdc cli changefeed list --pd=http://192.168.40.160:2379
或者查看我们任务的详细情况:
./cdc cli changefeed query --pd=http://192.168.40.160:2379 --changefeed-id=replication-task-2
三、SpringBoot Kafka监听
添加POM依赖
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
application
server:
port: 8081
spring:
kafka:
# kafka服务器地址(可以多个)
# bootstrap-servers: 192.168.159.128:9092,192.168.159.129:9092,192.168.159.130:9092
bootstrap-servers: 192.168.40.1:9092
consumer:
# 指定一个默认的组名
group-id: kafkaGroup
# earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
# latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
# none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
auto-offset-reset: earliest
# key/value的反序列化
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.apache.kafka.common.serialization.ByteArrayDeserializer
# value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
# key/value的序列化
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.apache.kafka.common.serialization.StringSerializer
# 批量抓取
batch-size: 65536
# 缓存容量
buffer-memory: 524288
#失败重试次数
retries: 3
# 服务器地址
# bootstrap-servers: 192.168.159.128:9092,192.168.159.129:9092,192.168.159.130:9092
消费者监听事件
@Slf4j
@Component
public class Jms_Consumer
@KafkaListener(topics = "tidb-cdc")
public void receive4(ConsumerRecord<?, ?> consumer) throws Exception
System.out.println("tidb tidb-cdc Listener >> ");
JSONObject jsonObject = JSONObject.parseObject(new String(consumer.value()));
String type = jsonObject.getString("type");
String db = jsonObject.getString("database");
String table = jsonObject.getString("table");
String data = jsonObject.getString("data");
log.info("操作类型:",type);
log.info("数据库:",db);
log.info("数据表:",table);
log.info("更新后数据:",data);
四、测试数据同步
向TIDB中插入数据:
insert into user(name,age) value('bxc','25');
kafka接受JSON
"id": 0,
"database": "testdb",
"table": "user",
"pkNames": ["id"],
"isDdl": false,
"type": "INSERT",
"es": 1638698748819,
"ts": 0,
"sql": "",
"sqlType":
"age": -5,
"id": -5,
"name": 12
,
"mysqlType":
"age": "int",
"id": "int",
"name": "varchar"
,
"data": [
"age": "25",
"id": "242219",
"name": "bxc"
],
"old": [null]
更新数据:
update user set age=24 where name = 'bxc';
Kafka接受JSON
"id": 0,
"database": "testdb",
"table": "user",
"pkNames": ["id"],
"isDdl": false,
"type": "UPDATE",
"es": 1638699660093,
"ts": 0,
"sql": "",
"sqlType":
"age": -5,
"id": -5,
"name": 12
,
"mysqlType":
"age": "int",
"id": "int",
"name": "varchar"
,
"data": [
"age": "24",
"id": "242216",
"name": "bxc"
],
"old": [
"age": "23",
"id": "242216",
"name": "bxc"
]
删除数据:
delete from user where name = 'bxc';
Kafka接受JSON
"id": 0,
"database": "testdb",
"table": "user",
"pkNames": ["id"],
"isDdl": false,
"type": "DELETE",
"es": 1638699773943,
"ts": 0,
"sql": "",
"sqlType":
"age": -5,
"id": -5,
"name": 12
,
"mysqlType":
"age": "int",
"id": "int",
"name": "varchar"
,
"data": [
"age": "25",
"id": "242218",
"name": "bxc"
],
"old": [
"age": "25",
"id": "242218",
"name": "bxc"
]
五、扩展
停止同步任务:
./cdc cli changefeed pause --pd=http://192.168.40.160:2379 --changefeed-id replication-task-2
删除同步任务
./cdc cli changefeed remove --pd=http://192.168.40.160:2379 --changefeed-id replication-task-2
喜欢的小伙伴可以关注我的个人微信公众号,获取更多学习资料!
以上是关于TIDB - 使用 TICDC 将数据同步至下游 Kafka 中的主要内容,如果未能解决你的问题,请参考以下文章
TIDB - 使用 TICDC 将数据同步至下游 Mysql 中
TiDB Online DDL 在 TiCDC 中的应用丨TiDB 工具分享