如何使用 debezium mysql 连接器 kafka 进行初始快照加载?

Posted

技术标签:

【中文标题】如何使用 debezium mysql 连接器 kafka 进行初始快照加载?【英文标题】:How to do an initial snapshot load with debezium mysql connector kafka? 【发布时间】:2020-07-20 07:09:32 【问题描述】:

我正在使用 Kafka 集群和 Debezium mysql 连接器从数据库更新中获取消息到 Elasticsearch。有一段时间我做了一些测试,结果我得到了一个混合的 ES,所以我想对 Elasticsearch 做一个全新的完整加载。

所以我想停止 debezium 连接器以停止馈送 ES 并删除所有索引,以便当我再次启动连接器时它可以满载。据我所知,连接器仅适用于 pause/resume 操作,并且不会执行初始加载。

此时,我将手动删除连接器和创建的主题并重新创建它们,以便它可以按初始状态加载,但是关于如何以正确的步骤执行此清理过程的任何想法?

【问题讨论】:

Debezium 确实对新的连接器创建进行了批量加载。所以删除旧的。不要暂停 感谢您的回答 cricket,但由于 kafka 主题中的偏移量已经到位,如果我只删除并重新创建连接器,它将不会再次加载数据。 你也许可以使用kafka-consumer-groups --reset-offsets 【参考方案1】:

我不知道是否有更好的方法,但这个解决方案对我有用:

    已停止 /etc/kafka/connect-distributed.properties 守护程序的服务。
sudo systemctl stop confluent-connect-distributed.service
    删除Debezium的连接器
curl -X DELETE http://localhost:8083/connectors/<connector name>
    删除所有与我的工作相关的主题,在这种情况下,这个 kafka 集群是一个开发者,所以我删除了所有主题,甚至是默认主题。
__consumer_offsets
_confluent-command
_schemas
connect-configs
connect-offsets
connect-status
kafka-topics --bootstrap-server <kafka bootstrap> --delete --topic <topic name>
    再次创建默认主题
kafka-topics --create --bootstrap-server <boostrap kafka> --topic connect-configs --replication-factor 3 --partitions 1 --config cleanup.policy=compact
kafka-topics --create --bootstrap-server <boostrap kafka> --topic connect-offsets --replication-factor 3 --partitions 50 --config cleanup.policy=compact
kafka-topics --create --bootstrap-server <boostrap kafka> --topic connect-status --replication-factor 3 --partitions 10 --config cleanup.policy=compact
    再次启动服务
sudo systemctl start confluent-connect-distributed.service
    再次创建连接器
curl -s -i -X PUT -H  "Content-Type:application/json" \
    http://localhost:8083/connectors/<connector name>/config \
    -d '<json>'

这样您最终将在 elasticsearch 中获得初始快照。

【讨论】:

您找到更好的方法了吗?我对此有几乎相同的过程 不,我已经用 bash 脚本自动化了这个过程并进行了初始快照。所以现在我只运行了一个包含所有这些内容的脚本,并且脚本由环境分隔。

以上是关于如何使用 debezium mysql 连接器 kafka 进行初始快照加载?的主要内容,如果未能解决你的问题,请参考以下文章

如何配置 Debezium Mysql 连接器以生成原始键而不是 struct 或 json 对象?

Debezium MySQL (MariaDB) 连接器:如何从以前的 bin-log 文件位置恢复

Debezium - 自定义负载 - MySQL 连接器

使用表白名单选项更新 Debezium MySQL 连接器

MySQL 的 Debezium 连接器。缺少数据库历史主题

Debezium Mysql连接器解析器下的IncompatibleClassChangeError