Kafka Connect 使用数组字段展平 postgres 记录的转换

Posted

技术标签:

【中文标题】Kafka Connect 使用数组字段展平 postgres 记录的转换【英文标题】:Kafka Connect flatten transformation of a postgres record with array field 【发布时间】:2020-08-05 04:35:52 【问题描述】:

我有一个使用 Kafka Connect 连接到 Kafka 的 postgres 数据库,以便将 CDC 事件放在一个主题上。 我们使用扁平化转换作为共享配置的一部分:

flattenKey:  "org.apache.kafka.connect.transforms.Flatten$Key"

表中的一列是 ARRAY 类型,因此在尝试应用转换时出现异常:

Flatten transformation does not support ARRAY for record without schemas (for field after.role_ids).

Kafka 连接参考:https://github.com/a0x8o/kafka/blob/master/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java#L246

我知道数组不能展平,为什么,我的问题是 - 有什么办法可以保持记录的展平,但以某种方式转换/转换数组,以便我仍然可以使用它?

该数组将包含整数,因此将其转换为一个字符串,其中所有元素都用逗号分隔,例如对我有用。 任何其他建议都会很棒。

我们使用 Debezium 进行配置。

【问题讨论】:

你的有效载荷是什么样子的? 【参考方案1】:

您需要编写自己的代码来处理这个问题,可以是 custom Single Message Transform,也可以是 Kafka Streams 之类的流处理器。

您还可以在this issue 上发表评论/投票,以将有助于解决此问题的函数添加到 ksqlDB 中。

【讨论】:

【参考方案2】:

如果您使用的是 Debezium 1.1 或更高版本,我建议您使用 custom column converter 而不是 SMT。转换器让您可以直接在 Debezium 内部调整架构和值,因此“将其转换为所有元素用逗号分隔的字符串”只需一点编码即可完成。

【讨论】:

【参考方案3】:

在 JDBC 接收器连接器中,数组原语通过票证 https://github.com/confluentinc/kafka-connect-jdbc/pull/805 处理

【讨论】:

以上是关于Kafka Connect 使用数组字段展平 postgres 记录的转换的主要内容,如果未能解决你的问题,请参考以下文章

如何重命名/替换 Kafka-connect SMT 结构中的字段?

在powershell中展平数组

如何通过 Debezium Connect 反序列化来自 Kafka 消息流的几何字段?

即使json数据包含架构和有效负载字段,kafka connect hdfs sink连接器也会失败

kafka jdbc sink连接器抛出org.apache.kafka.connect.errors.DataException(结构模式的字段名称未正确指定)插入PG表

Kafka Connect有没有办法忽略模式文件中的1个字段并将其他字段读入数据库