Kafka Connect JDBC 接收器连接器

Posted

技术标签:

【中文标题】Kafka Connect JDBC 接收器连接器【英文标题】:Kafka Connect JDBC Sink Connector 【发布时间】:2019-06-17 10:01:11 【问题描述】:

我正在尝试将主题中的数据(json 数据)写入 mysql 数据库。我相信我想要一个 JDBC Sink 连接器。

如何配置连接器,将主题中的json数据映射到如何向数据库中插入数据。

我能找到的唯一文档就是这个。

“接收器连接器需要模式知识,因此您应该使用 合适的转换器,例如Schema 自带的 Avro 转换器 注册表或启用了模式的 JSON 转换器。卡夫卡唱片 键(如果存在)可以是原始类型或 Connect 结构,并且 记录值必须是一个 Connect 结构。从中选择的字段 连接结构必须是原始类型。如果主题中的数据 不是兼容的格式,实现自定义转换器可能是 必须的。”

但是如何配置呢?有什么例子吗?

【问题讨论】:

看看这个:ibm.com/support/knowledgecenter/en/SSPT3X_4.2.5/… 和 blog.magiclyde.me/post/… @SRJ,所以json字段的key需要匹配db表中的一个列名? 是的,根据您的架构。在这里查看例如docs.confluent.io/current/connect/kafka-connect-jdbc/… @SRJ,所以我认为这意味着您需要使用 Confluent Schema Registry? 【参考方案1】:

我认为这意味着您需要使用 Confluent Schema Registry?

对于“更好”的架构支持,是的。但不,这不是必需的。

您可以将 JsonConverter 与 schemas.enable=true 一起使用

您的 JSON 消息需要看起来像这样,


   "schema" : 
      ... data that describes the payload
   , 
   "payload": 
      ... your actual data
   

这个格式的参考可以看this blog

您可以使用 Kafka Streams or KSQL 更轻松地将“无模式”JSON 转换为 schema-d Avro 有效负载

【讨论】:

也可以创建自己的 Converter 来创建带有 Schema 的 Record ,并像其他 Kafka Connects 插件(Connectors、Transform)一样注册它。 @wardziniak 有没有你知道的可以添加到schema.literal 的转换,比如说描述一个“原始”JSON 有效负载? @wardziniak K. 我想我会在有时间的时候添加一个 JIRA

以上是关于Kafka Connect JDBC 接收器连接器的主要内容,如果未能解决你的问题,请参考以下文章

kafka-connect JDBC PostgreSQL Sink Connector 显式定义 PostgrSQL 模式(命名空间)

使用 Kafka KSQL AVRO 表作为 Kafka Connect JDBC Sink 源的问题

带有旧数据库的JDBC Kafka连接器

Confluent Kafka Connect MySQL Sink Connector 的开源替代方案?

Kafka Connect JDBC Sink - 一个接收器配置中每个主题(表)的 pk.fields

pyflink消费kafka-connect-jdbc消息(带schema)