Kafka JDBC Sink 句柄数组数据类型

Posted

技术标签:

【中文标题】Kafka JDBC Sink 句柄数组数据类型【英文标题】:Kafka JDBC Sink handle array datatype 【发布时间】:2021-05-27 09:56:24 【问题描述】:

我知道 Kafka JDBC Sink 连接器对于数组数据类型有一些缺点。但是,是否可以将 Sink 连接器与支持数组数据类型的简单 Kafka 连接器结合使用。如何从 Kafka 配置中过滤并将它们切换为简单的 Kafka 连接器配置 简单的 Kafka 配置是什么意思? Kafka Connect 如何支持数组字段

name: topic_name
type: array
item: Topic file

这是否可能以字符串而不是数组的形式消耗到数据库中

"fields":[
  "name":"item_id",
  "type":
     "type":"array",
     "items":["null", "string"]
  ,
"default":[]
]

【问题讨论】:

【参考方案1】:

Kafka Connect 框架本身并没有暴露类型的限制,它在 JDBC 接收器的源代码中拒绝了数组。

有一个出色的 PR 支持 Postgres - https://github.com/confluentinc/kafka-connect-jdbc/pull/805

不清楚您所说的“简单”是什么意思,但如果您想使用不同的连接器,则需要安装它,然后更改类。例如,也许 MongoDB 接收器处理数组。我知道 S3 和 HDFS 接收器确实...

是否可以将 Sink 连接器与简单的 Kafka 连接器结合使用

同样,不确定您的意思是什么,但连接器通常不会“链接在一起”。虽然您可以使用带有转换的 MirrorMaker2 来有效地完成与 Kafka Streams 相同的工作,但最好使用更合适的工具来实现这一点

这可能会以字符串而不是数组的形式消耗到数据库

当然,如果消息字段实际上是一个字符串。正如建议的那样,您需要在接收器连接器使用它之前处理消息

【讨论】:

所以我尝试使用 Debezium sink 连接器,因为我读到它支持它,但是当我部署它时,我收到一个 500 错误,需要一些连接器类来支持 Debezium 并且 JDBC Sink 是唯一的选择。是否有更多配置,例如在配置的 Dockerfile 或 yaml 文件/worker 属性中使其工作***.com/questions/66893323/… 是否可以提取数组字段并让其他字段读入数据库 Debezium 是一个源,而不是一个接收器,并且您的错误与连接无关,而是您命名的 docker 映像 我明白了,我将 docker 镜像命名为 img.dev/kafka/kafka-sink-connect:TAG,所以我需要对 Docker 和 worker 文件进行更多配置来支持它。我尝试添加一个 HOIST,然后将 SMT 展平为 Kafka 文档docs.confluent.io/platform/current/connect/transforms/…,因为它说使用 Struct 包装数据并破坏字段,我让它运行但 JDBC Sink 没有数据进入。关于为什么的任何建议?当我尝试 HoiseField$Value 时,我看到 INFO Setting offset for partition to topic 如果我将字段从数组类型更改为结构,并且我使用 HOIST 并展平 SMT,它将在 db 中读取 根据文档,Flatten 只适用于 Structs 和 maps,它不会遍历数组。您需要查看日志以了解未写入数据的原因(或尝试运行哪些查询)

以上是关于Kafka JDBC Sink 句柄数组数据类型的主要内容,如果未能解决你的问题,请参考以下文章

无法运行JDBC sink将数据从Kafka移动到MS SQL Server

无法使用 JDBC kafka-sink-connector 将 kafka 主题数据写入 postgres DB

使用 Kafka Connect API JDBC Sink 连接器示例到 Oracle 数据库的 Kafka 主题

如何使用 FME 处理 Kafka JDBC Sink 连接器

提取和转换 jdbc sink 连接器的 kafka 消息特定字段

为jdbc sink连接器提取和转换kafka消息的特定字段。