Kafka Connect JDBC Sink 连接器:如何删除没有 NULL 值的记录?

Posted

技术标签:

【中文标题】Kafka Connect JDBC Sink 连接器:如何删除没有 NULL 值的记录?【英文标题】:Kafka Connect JDBC Sink Connector: How to delete a record that doesn't have a NULL value? 【发布时间】:2020-06-19 12:42:21 【问题描述】:

是否有(推荐的)方法从记录值不为 NULL 的 Kafka Connect JDBC Sink 连接器中删除记录?

例如,如果我的 JSON 配置包含以下内容:

...
"delete.enabled": "true",
"pk.mode": "record_key",
...

而且我的记录的值是非空的,有没有办法在数据库中删除该记录?

我问是因为记录的值有一个字段来标记它是否应该被删除,即像“Operation”这样的列,其中“Operation”==“D”应该是通过 JDBC 在数据库中删除。

如果有标准/推荐的方法来做到这一点,我很想听听。我唯一的另一个想法是进行自定义转换,检查“操作”列的值“D”,如果匹配,我们将 PK 完整但值设置为 NULL 的记录传回,即墓碑记录这应该被连接器作为删除操作拾取。有这种可能吗?

感谢您的帮助,谢谢!

【问题讨论】:

【参考方案1】:

还没有回复,但我的解决方案有点老套:

创建了一个自定义转换,如果满足特定条件(在我的情况下,这是检查记录值中的字段),则将记录的值设置为 NULL(创建墓碑记录) 如果条件不满足,转换返回原始记录 打包到 JAR 中 在“plugin.path”上提供了 JAR 确保 "delete.enabled":"true" 和 "pk.mode":"record_key" 以便实际删除墓碑记录 发送 POST 请求以实例化连接器时,在 POST 正文中包含转换和任何相关配置

希望对你有帮助

【讨论】:

太酷了。我有同样的问题,你能分享你的代码如何实现它吗?谢谢。 您好 bmoe24x,如果您不介意,请与我们分享您的示例。谢谢。

以上是关于Kafka Connect JDBC Sink 连接器:如何删除没有 NULL 值的记录?的主要内容,如果未能解决你的问题,请参考以下文章

使用 Kafka Connect API JDBC Sink 连接器示例到 Oracle 数据库的 Kafka 主题

Kafka Connect JDBC Sink 连接器:如何删除没有 NULL 值的记录?

kafka jdbc sink连接器抛出org.apache.kafka.connect.errors.DataException(结构模式的字段名称未正确指定)插入PG表

Kafka Connect JDBC Sink - 一个接收器配置中每个主题(表)的 pk.fields

Kafka Connect - JSON 转换器 - JDBC Sink 连接器 - 列类型 JSON

如何使用debezium更改数据捕获在mysql中捕获数据并在kafka connect中使用jdbc sink?