Kafka Connect JDBC 接收器连接器
Posted
技术标签:
【中文标题】Kafka Connect JDBC 接收器连接器【英文标题】:Kafka Connect JDBC Sink Connector 【发布时间】:2019-06-17 10:01:11 【问题描述】:我正在尝试将主题中的数据(json 数据)写入 mysql 数据库。我相信我想要一个 JDBC Sink 连接器。
如何配置连接器,将主题中的json数据映射到如何向数据库中插入数据。
我能找到的唯一文档就是这个。
“接收器连接器需要模式知识,因此您应该使用 合适的转换器,例如Schema 自带的 Avro 转换器 注册表或启用了模式的 JSON 转换器。卡夫卡唱片 键(如果存在)可以是原始类型或 Connect 结构,并且 记录值必须是一个 Connect 结构。从中选择的字段 连接结构必须是原始类型。如果主题中的数据 不是兼容的格式,实现自定义转换器可能是 必须的。”
但是如何配置呢?有什么例子吗?
【问题讨论】:
看看这个:ibm.com/support/knowledgecenter/en/SSPT3X_4.2.5/… 和 blog.magiclyde.me/post/… @SRJ,所以json字段的key需要匹配db表中的一个列名? 是的,根据您的架构。在这里查看例如docs.confluent.io/current/connect/kafka-connect-jdbc/… @SRJ,所以我认为这意味着您需要使用 Confluent Schema Registry? 【参考方案1】:我认为这意味着您需要使用 Confluent Schema Registry?
对于“更好”的架构支持,是的。但不,这不是必需的。
您可以将 JsonConverter 与 schemas.enable=true
一起使用
您的 JSON 消息需要看起来像这样,
"schema" :
... data that describes the payload
,
"payload":
... your actual data
这个格式的参考可以看this blog
您可以使用 Kafka Streams or KSQL 更轻松地将“无模式”JSON 转换为 schema-d Avro 有效负载
【讨论】:
也可以创建自己的 Converter 来创建带有 Schema 的 Record ,并像其他 Kafka Connects 插件(Connectors、Transform)一样注册它。 @wardziniak 有没有你知道的可以添加到schema.literal
的转换,比如说描述一个“原始”JSON 有效负载?
@wardziniak K. 我想我会在有时间的时候添加一个 JIRA以上是关于Kafka Connect JDBC 接收器连接器的主要内容,如果未能解决你的问题,请参考以下文章
kafka-connect JDBC PostgreSQL Sink Connector 显式定义 PostgrSQL 模式(命名空间)
使用 Kafka KSQL AVRO 表作为 Kafka Connect JDBC Sink 源的问题
Confluent Kafka Connect MySQL Sink Connector 的开源替代方案?