如何使用debezium更改数据捕获在mysql中捕获数据并在kafka connect中使用jdbc sink?

Posted

技术标签:

【中文标题】如何使用debezium更改数据捕获在mysql中捕获数据并在kafka connect中使用jdbc sink?【英文标题】:How to capture data in mysql with debezium change data capture and consume with jdbc sink in kafka connect? 【发布时间】:2017-10-13 13:59:31 【问题描述】:

我在使用 debezium 更改数据捕获的 mysql 中捕获数据并使用 kafka connect jdbc sink 将其消耗到另一个 mysql 时遇到问题。

因为 debezium 生成到 kafka topic 的 schema 和 payload 与 kafka connect jdbc sink 期望的 schema 不兼容。

当 jdbc sink 想要消费数据并在另一个 mysql 中创建记录时出现异常。

我应该如何解决这个问题?

【问题讨论】:

【参考方案1】:

Debezium 生成的消息结构确实与 JDBC 接收器预期的不同。 JDBC 接收器期望消息中的每个字段对应于行中的一个字段,因此消息对应于行的“之后”状态。 OTOH,Debezium MySQL connector 执行变更数据捕获,这意味着它不仅仅包含行的最新状态。具体来说,连接器输出带有包含行的主键或唯一键列的键和包含信封结构的消息值的消息:

操作,例如是插入、更新还是删除 更改发生之前行的状态(插入时为空) 发生更改后行的状态(删除时为空) 特定于源的信息,包括服务器元数据、事务 ID、数据库和表名称、事件发生时的服务器时间戳以及有关事件在何处找到的详细信息等。 连接器生成事件的时间戳

解决这种差异的最简单方法是使用 Kafka 0.10.2.x(当前最新版本为 0.10.2.1)和 Kafka Connect 的新 Single Message Transforms (SMTs)。每个 Kafka Connect 连接器都可以配置零个或多个 SMT 链,这些 SMT 链可以在将消息写入 Kafka 之前转换源连接器的输出,或者在将消息作为输入传递到接收器连接器之前转换从 Kafka 读取的消息。 SMT 故意非常简单,处理单个消息,绝对不应该访问外部资源或维护任何状态,因此不能替代 Kafka Streams 或其他更强大的流处理系统,可以加入多个输入流,并且可以执行非常复杂的操作并跨多个消息维护状态。

如果您使用 Kafka Streams 进行任何类型的处理,那么您应该考虑在您的 Kafka Streams 应用程序中操作消息结构。如果没有,那么 SMT 是解决问题的好方法。实际上,有两种方法可以使用 SMT 来调整消息结构。

第一个选项是使用带有 Debezium 连接器的 SMT 来提取/保留行的“之后”状态,并在将其写入 Kafka 之前丢弃所有其他信息。当然,您会在 Kafka 主题中存储更少的信息,并丢弃一些未来可能有价值的 CDC 信息。

第二个也是 IMO 首选的选项是让源连接器保持原样并将所有 CDC 消息保留在 Kafka 主题中,然后使用带有接收器连接器的 SMT 来提取/保留“之后”状态在将消息传递到 JDBC 接收器连接器之前丢弃所有其他信息。您也许可以使用 Kafka Connect 中包含的现有 SMT 之一,但您可以考虑编写自己的 SMT 来完全满足您的需求。

【讨论】:

感谢亲爱的 Randall 的精彩回答。 请注意,较新的 Debezium 版本提供了开箱即用的 such SMT。我们还有一个complete example,它显示了与 JDBC 接收器连接器的交互。

以上是关于如何使用debezium更改数据捕获在mysql中捕获数据并在kafka connect中使用jdbc sink?的主要内容,如果未能解决你的问题,请参考以下文章

MySQL使用Debezium更改为Kafka-仅捕获DDL stmts

Debezium 创建新主题时如何编辑复制因子

MySQL 的 Debezium 连接器。缺少数据库历史主题

debezium 是不是支持捕获 postgres 模式更改事件?

使用 Debezium 将所有数据库表捕获到一个 Kafka 主题 [重复]

如何使用在 docker 上运行的 debezium 和 confluent-sink-connector 将所有更改从源数据库复制到目标数据库