如何根据连接器名称获取 Kafka 源连接器架构
Posted
技术标签:
【中文标题】如何根据连接器名称获取 Kafka 源连接器架构【英文标题】:How to fetch Kafka source connector schema based on connector name 【发布时间】:2019-04-24 19:24:31 【问题描述】:我正在使用 Confluent JDBC Kafka 连接器将消息发布到主题中。源连接器将在每次轮询时将数据连同模式一起发送到主题。我想检索这个架构。
有可能吗?如何?谁能推荐我
我的目的是根据 Kafka 连接器在轮询时构建的模式创建 KSQL 流或表。
【问题讨论】:
【参考方案1】:最好的方法是使用 Avro,其中架构是单独存储的,并由 Kafka Connect 和 KSQL 自动使用。
您可以通过将 Kafka Connect 配置为使用 AvroConverter 来使用 Avro。在您的 Kafka Connect 工作器配置集中:
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://schema-registry:8081
(将 schema-registry
更新为运行 Schema Registry 的主机名)
从那里开始,在 KSQL 中您只需使用
CREATE STREAM my_stream WITH (KAFKA_TOPIC='source_topic', VALUE_FORMAT='AVRO');
您无需在此处指定模式本身,因为 KSQL 会从模式注册表中获取它。
您可以阅读更多关于转换器和序列化程序here 的信息。
免责声明:我为 Confluent 工作,并撰写了引用的博客文章。
【讨论】:
我看到了你的帖子。但我在这里有两个问题。 1. 我正在使用 confluent JDBC 连接器,它以分隔格式发送记录。它还在工作吗? 2. 如果架构更新了会发生什么。 KSQL 是否能够引用更新后的架构? 第一个问题的答案是肯定的,但请确认。请回答第二个问题 1. JDBC Connector使用的是Kafka Connect API,它支持多种转换器,包括JSON、Avro等,所以不是“JDBC连接器以分隔格式发送记录”的情况。它可以配置发送数据的格式。 2.当您针对主题创建新流时,KSQL 应该拉取最新版本的架构,是的。以上是关于如何根据连接器名称获取 Kafka 源连接器架构的主要内容,如果未能解决你的问题,请参考以下文章
使用 azure 事件中心源连接器时以字节形式返回的 Avro 架构