PostgreSQL 和事务上的 Debezium CDC

Posted

技术标签:

【中文标题】PostgreSQL 和事务上的 Debezium CDC【英文标题】:Debezium CDC on PostgreSQL and transactions 【发布时间】:2020-11-28 10:57:09 【问题描述】:

我想在 PostgreSQL 数据库上使用 Debezium 进行变更数据捕获 (CDC)。现在我还不完全了解如何在 Debezium 中处理事务。

让我们考虑以下示例 - 我在源 PostgreSQL 数据库中有 2 个表。我在单个数据库事务的范围内将数据插入到这两个表中。换句话说,我必须执行两个单独的 INSERT 语句。如果我在 Debezium CDC 的情况下理解正确,它将导致两个单独的消息进入两个单独的 Kafka 主题(或 AWS Kinesis 流)。这些消息中的每一个都将由自己的消费者使用,并在单独的事务中插入到目标数据库中。如果其中一个事务失败,另一个事务可以成功,我会在目标数据库中遇到数据不一致状态。

Debezium 中是否有任何标准机制来处理这种情况?或者例如,为了避免它,我必须在更新目标数据库之前使用 Kafka Streams API 并将这两个主题合并为一个(在 transactionId 上)?

【问题讨论】:

【参考方案1】:

您正确描述了默认行为。

如果您想使用一个事务将多条记录写入接收器数据库,您可能必须使用定制的消费者应用程序,该应用程序在内部缓冲源自一个事务的事件并将它们一次写入接收器数据库在单笔交易中。简单地将事件集中在一个主题上对您没有帮助,因为通用接收器连接器仍然不知道源事务边界。

您可以使用 Debezium 的 transaction metadata 主题。它提供了此类消费者应用程序实现此类缓冲逻辑所需的所有信息。不幸的是,我们还没有一个全面的演示,但我希望我们能尽快在博客上介绍这个。

【讨论】:

感谢您的回答。请让我总结一下我对你所说的话的理解。我需要实现定制的应用程序,它将监听参与我的业务事务的表的所有 Kafka 主题,并在某个缓冲区(在内存或数据库中)中收集我身边某处的所有事件。我还需要在 Debezium 的事务元数据主题中监听事件,因此我会知道每个源事务边界(事件编号等)。 一旦我了解到我已经在本地收集了特定事务的所有事件,我将在单个事务的范围内将这些更改复制到我的接收器数据库。并且将为我在 Debezium 的事务元数据主题中的每个事务重复此步骤。我说的对吗? 如果是这样,Debezium 将在第一次执行后尝试加载源数据库中现有数据的初始快照怎么样? Debezium 是否会在源数据库中的现有数据上遵循这样的场景? 前两个 cmets 是对的。重新快照,TX 当前未在 TX 元数据主题中表示(我们已计划添加该主题)。 source 更改事件块中有一个标记,它告诉您这是否是最后一个快照事件。这样一来,您应该能够等待所有快照事件,但在一次写出所有事件时,您可能会遇到一个大事务。 是的,快照是通过 JDBC 读取的,即 WAL 格式对此没有任何影响。您甚至可以(仅)使用 PG 9.2 拍摄快照。

以上是关于PostgreSQL 和事务上的 Debezium CDC的主要内容,如果未能解决你的问题,请参考以下文章

如何使用Debezium从MS SQL中将250张表导入Kafka

Embedded-Debezium spring boot postgreSQL DB 连接问题

您通常如何处理数据库事务日志?

Debezium Kafka Connect需要重启才能使PostgreSQL cdc正常工作

使用 debezium 链接 postgresql 11 无法获取数据库测试的编码

将 ISO8601 字符串隐式转换为 Debezium 的 TIMESTAMPTZ (postgresql)