Kafka Connect 使用数组字段展平 postgres 记录的转换
Posted
技术标签:
【中文标题】Kafka Connect 使用数组字段展平 postgres 记录的转换【英文标题】:Kafka Connect flatten transformation of a postgres record with array field 【发布时间】:2020-08-05 04:35:52 【问题描述】:我有一个使用 Kafka Connect 连接到 Kafka 的 postgres 数据库,以便将 CDC 事件放在一个主题上。 我们使用扁平化转换作为共享配置的一部分:
flattenKey: "org.apache.kafka.connect.transforms.Flatten$Key"
表中的一列是 ARRAY 类型,因此在尝试应用转换时出现异常:
Flatten transformation does not support ARRAY for record without schemas (for field after.role_ids).
Kafka 连接参考:https://github.com/a0x8o/kafka/blob/master/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/Flatten.java#L246
我知道数组不能展平,为什么,我的问题是 - 有什么办法可以保持记录的展平,但以某种方式转换/转换数组,以便我仍然可以使用它?
该数组将包含整数,因此将其转换为一个字符串,其中所有元素都用逗号分隔,例如对我有用。 任何其他建议都会很棒。
我们使用 Debezium 进行配置。
【问题讨论】:
你的有效载荷是什么样子的? 【参考方案1】:您需要编写自己的代码来处理这个问题,可以是 custom Single Message Transform,也可以是 Kafka Streams 之类的流处理器。
您还可以在this issue 上发表评论/投票,以将有助于解决此问题的函数添加到 ksqlDB 中。
【讨论】:
【参考方案2】:如果您使用的是 Debezium 1.1 或更高版本,我建议您使用 custom column converter 而不是 SMT。转换器让您可以直接在 Debezium 内部调整架构和值,因此“将其转换为所有元素用逗号分隔的字符串”只需一点编码即可完成。
【讨论】:
【参考方案3】:在 JDBC 接收器连接器中,数组原语通过票证 https://github.com/confluentinc/kafka-connect-jdbc/pull/805 处理
【讨论】:
以上是关于Kafka Connect 使用数组字段展平 postgres 记录的转换的主要内容,如果未能解决你的问题,请参考以下文章
如何重命名/替换 Kafka-connect SMT 结构中的字段?
如何通过 Debezium Connect 反序列化来自 Kafka 消息流的几何字段?
即使json数据包含架构和有效负载字段,kafka connect hdfs sink连接器也会失败
kafka jdbc sink连接器抛出org.apache.kafka.connect.errors.DataException(结构模式的字段名称未正确指定)插入PG表