Kafka Connect 不适用于主题策略

Posted

技术标签:

【中文标题】Kafka Connect 不适用于主题策略【英文标题】:Kafka Connect not working with Subject Strategies 【发布时间】:2019-04-30 11:47:59 【问题描述】:

上下文

我编写了几个小的Kafka Connect 连接器。一个每秒只生成随机数据,另一个将其记录在控制台中。它们与Schema Registry 集成,因此数据使用Avro 进行序列化。

我使用 fast-data-dev Docker image provided by Landoop 将它们部署到本地 Kafka 环境中

基本设置有效并每秒生成一条记录的消息

但是,我想更改subject name strategy。默认生成两个主题:

$topic-key $topic-value

根据我的用例,我需要生成具有不同模式的事件,这些事件最终会出现在同一个主题上。因此,我需要的主题名称是:

$topic-$keyRecordName $topic-$valueRecordName

根据the docs,我的需求适合TopicRecordNameStrategy

我尝试了什么

我创建了avroData 对象用于发送值以进行连接:

class SampleSourceConnectorTask : SourceTask() 

    private lateinit var avroData: AvroData 

    override fun start(props: Map<String, String>) 
        [...]
        avroData = AvroData(AvroDataConfig(props))
    

然后使用它来创建SourceRecord 响应对象

The documentation 声明为了在 Kafka Connect 中使用模式注册表,我必须在连接器配置中设置一些属性。因此,当我创建它时,我会添加它们:

name=SampleSourceConnector
connector.class=[...]
tasks.max=1
key.converter=io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url=http://localhost:8081
value.converter=io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url=http://localhost:8081
key.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy

问题

连接器似乎忽略了这些属性并继续使用旧的$topic-key$topic-value 主题。

问题

Kafka Connect 应该支持不同的主题策略。我通过编写自己的AvroConverter 版本并硬编码主题策略是我需要的策略,设法解决了这个问题。但是,这看起来不是一个好方法,并且在尝试使用 Sink Kafka 连接器使用数据时也带来了问题。我复制了主题,所以有一个旧版本的版本 ($topic-key),它可以工作

为 Kafka Connect 指定主题策略的正确设置是什么?

【问题讨论】:

【参考方案1】:

您缺少 key.convertervalue.converter 前缀,以便将配置传递给转换器。所以而不是:

key.subject.name.strategy
value.subject.name.strategy

你想要的:

key.converter.key.subject.name.strategy
value.converter.value.subject.name.strategy

来源https://docs.confluent.io/current/connect/managing/configuring.html

要将配置参数传递给键和值转换器,请在它们前面加上 key.converter.value.converter.,就像在工作配置中定义默认转换器时一样。请注意,这些仅在 key.convertervalue.converter 属性中指定了相应的转换器配置时使用。

【讨论】:

你是对的。我在文档中没有找到与此相关的任何内容(可能我没有充分挖掘)。您是否有指向某些文档的链接?这将完成答案 编辑问题以包括参考 对于上下文,这直到 Confluent 4.1.3,AFAIK 才起作用。 github.com/confluentinc/schema-registry/pull/801

以上是关于Kafka Connect 不适用于主题策略的主要内容,如果未能解决你的问题,请参考以下文章

Kafka Connect - 不适用于更新操作

Kafka Connect:一个接收器连接器,用于从一个主题写入多个表

一文读懂Kafka Connect核心概念

用于复制的 Kafka 设置策略?

kafka connect - 审计 - 在任务完成时触发事件

使用独立模式 Kafka-connect 将 Postgresql 的数据捕获更改为 kafka 主题