如何根据连接器名称获取 Kafka 源连接器架构

Posted

技术标签:

【中文标题】如何根据连接器名称获取 Kafka 源连接器架构【英文标题】:How to fetch Kafka source connector schema based on connector name 【发布时间】:2019-04-24 19:24:31 【问题描述】:

我正在使用 Confluent JDBC Kafka 连接器将消息发布到主题中。源连接器将在每次轮询时将数据连同模式一起发送到主题。我想检索这个架构。

有可能吗?如何?谁能推荐我

我的目的是根据 Kafka 连接器在轮询时构建的模式创建 KSQL 流或表。

【问题讨论】:

【参考方案1】:

最好的方法是使用 Avro,其中架构是单独存储的,并由 Kafka Connect 和 KSQL 自动使用。

您可以通过将 Kafka Connect 配置为使用 AvroConverter 来使用 Avro。在您的 Kafka Connect 工作器配置集中:

key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://schema-registry:8081

schema-registry 更新为运行 Schema Registry 的主机名

从那里开始,在 KSQL 中您只需使用

CREATE STREAM my_stream WITH (KAFKA_TOPIC='source_topic', VALUE_FORMAT='AVRO');

您无需在此处指定模式本身,因为 KSQL 会从模式注册表中获取它。

您可以阅读更多关于转换器和序列化程序here 的信息。

免责声明:我为 Confluent 工作,并撰写了引用的博客文章。

【讨论】:

我看到了你的帖子。但我在这里有两个问题。 1. 我正在使用 confluent JDBC 连接器,它以分隔格式发送记录。它还在工作吗? 2. 如果架构更新了会发生什么。 KSQL 是否能够引用更新后的架构? 第一个问题的答案是肯定的,但请确认。请回答第二个问题 1. JDBC Connector使用的是Kafka Connect API,它支持多种转换器,包括JSON、Avro等,所以不是“JDBC连接器以分隔格式发送记录”的情况。它可以配置发送数据的格式。 2.当您针对主题创建新流时,KSQL 应该拉取最新版本的架构,是的。

以上是关于如何根据连接器名称获取 Kafka 源连接器架构的主要内容,如果未能解决你的问题,请参考以下文章

到 Kafka 的 AWS Sqs 源连接器

kafka JDBC源连接器无法获取postgres表

使用 azure 事件中心源连接器时以字节形式返回的 Avro 架构

在跟踪集合上的更新操作时,如何在使用 kafka mongodb 源连接器时获取完整文档?

重新启动 kafka 连接接收器和源连接器以从头开始读取

kafka s3 sink连接器在获取NULL数据时崩溃