如何解决配置中设置的 Kafka JDBC Sink 连接器中与 TopicRecordName 与 TopicNameStrategy 的冲突
Posted
技术标签:
【中文标题】如何解决配置中设置的 Kafka JDBC Sink 连接器中与 TopicRecordName 与 TopicNameStrategy 的冲突【英文标题】:How to resolve conflicts with TopicRecordName vs TopicNameStrategy in Kafka JDBC Sink Connector set in configurations 【发布时间】:2021-12-14 20:52:33 【问题描述】:我目前有两个独立的架构注册表链接
1) https://schema.dev/subjects/session-value/versions (original)
2) https://schema.dev/subjects/reservation-io.models.avro.Reservation/versions (new)
我有两个工人和连接器 由于数据库中使用了不同的主题,因此我部署了这两个工作人员。
我读到我们可以使用value.converter.value.subject.name.strategy=io.confluent.kafka.serializers.subject.TopicRecordNameStrategy
如果需要,可以链接到新链接。
所以我将该配置放入其中一个工作文件,而另一个工作文件我没有,只是让默认的 TopicNameStrategy 读取原始链接。
但是,当我将这些主题部署到指定环境时,我经常看到 #1 链接找不到该主题的 TopicRecordName 架构链接的错误
例如,会话主题应该是从 #1 的模式中读取的,但它却说它正在尝试查找 2) https://schema.dev/subjects/session-io.models.avro.Session/versions
这意味着它正在从不正确的工作文件中读取。
这是错误代码:
Caused by org.apache.kafka.common.errors.SerializationException: Error retrieving Avro schema id 11
Caused by io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Subject 'session-io.models.avro.Session' not found.; error code404
at io.confluent.kafka.avro.AvroConverter$Deserializer.deserialize
at io.confluent.kafka.avro.AvroConverter.toConnectData
连接器和工作文件都从端口 8084 读取,该端口处于分布式模式。因此,当 podA 和 podB 在其指定的 pod 内有 connectorA 和 connectorB 时,我们会看到 podA 有运行错误工作文件的 connectorB。有没有办法只让 connectorA 运行默认的 TopicNameStrategy,而 connectorB 运行 TopicRecordNameStrategy 配置。我阅读了所有包含此问题的帖子,但没有解决方法。我不断重新启动 pod,并且不断出现此错误。我应该将端口号更改为其他内容还是 group_id?任何建议。
生产者设置为仅从 Kafka Streams 生成消息
【问题讨论】:
【参考方案1】:在分布式模式下,您无法控制哪个任务获取哪个配置。因此,所有工作人员都应该具有相同的配置
请记住,连接器的转换器属性优先于工作者的
【讨论】:
但是目前我在我的配置之一中将 value.converter.value.subject.name.strategy 设置为 topicRecordNameStrategy 并将 value.converter 作为 AvroConverter,而将 key.converter 作为 StringConverter。我看到当我使用 topicRecordNameStrategy 时,它会默认接管 topicRecordStrategy。是否可以更改端口,其中一个端口使用 topicRecordNameStrategy,而另一个端口使用默认值,因此不会冲突 我不明白你所说的“端口”是什么意思。 Connect REST API 和 Schema Registry 各只使用一个端口,更改它并不能解决您的问题。 我想知道我是否在 1 个端口中使用了 TopicRecordName,并且我还在同一个端口中使用了默认值,这可能会导致冲突,不知道是使用默认值还是 TopicRecordName,这就是我的意思看到。那么是否可以在其他 1 个端口中使用默认值并在第二个端口中使用 TopicRecordName 以避免冲突 再说一次,我不明白你所说的“端口”是什么意思,所以我要说,不,这是不可能的。连接集群平均分配任务。您必须运行单独的连接 进程(两个唯一的集群)来分隔不同的 REST 服务器端口。这意味着单独的消费者群体,因此重复的数据以上是关于如何解决配置中设置的 Kafka JDBC Sink 连接器中与 TopicRecordName 与 TopicNameStrategy 的冲突的主要内容,如果未能解决你的问题,请参考以下文章
如何让 Jersey 客户端使用 HttpsURLConnection 中设置的 defaultSSLSocketFactory?