Kafka Connect 中的静态成员资格
Posted
技术标签:
【中文标题】Kafka Connect 中的静态成员资格【英文标题】:Static membership in Kafka Connect 【发布时间】:2021-08-25 06:35:38 【问题描述】:我正在尝试在 Kafka Connect 中实现静态成员资格。 我们的 Kafka Connect 集群使用 Strimzi Kafka 算子部署在 K8S 上。
我已尝试为工作人员(在 KafkaConnect yaml 中)放置以下配置:
connector.client.config.override.policy: All
consumer.group.instance.id: somethingsomething
在 HttpSinkConnector 类中我有这个:
@Override
public List<Map<String, String>> taskConfigs(int maxTasks)
List<Map<String, String>> configs = new ArrayList<>(maxTasks);
for (int i = 0; i < maxTasks; i++)
Map<String, String> configCopy = new HashMap<>(this.configProps);
configCopy.put("consumer.override.group.instance.id", Thread.currentThread().getName());
configs.add(configCopy);
return configs;
这给了 org.apache.kafka.common.errors.FencedInstanceIdException 一些日志 - ...07:07:32,631 错误 [Consumer instanceId=somethingsomething... 因为所有任务都得到了一些东西作为他们的 group.instance.id 虽然它应该有 Thread.currentThread().getName()。
我也尝试了以下方法(没有工人配置):
@Override
public List<Map<String, String>> taskConfigs(int maxTasks)
List<Map<String, String>> configs = new ArrayList<>(maxTasks);
for (int i = 0; i < maxTasks; i++)
Map<String, String> configCopy = new HashMap<>(this.configProps);
configCopy.put("consumer.group.instance.id", "somethingsomething");
configs.add(configCopy);
return configs;
这什么也没做(没有错误,日志中没有 instanceId),这意味着我把这个配置值放在了错误的地方。
那么如何在 Kafka Connect 上实现静态成员资格?
【问题讨论】:
【参考方案1】:使用 consumer.override.group.instance.id 而不是 consumer.group.instance.id
【讨论】:
这是问题中描述的第一次尝试... 我相信你指的是connector.client.config.override.policy: All ,我假设你必须在kafka connect配置中进行配置。那是正确的。我指的是在您的连接器配置中使用 consumer.override.group.instance.id。而不是这个 => consumer.group.instance.id: somethingsomething 使用 => consumer.override.group.instance.id: somethingsomething 不会按照您建议的方式配置连接器,这将使所有任务都使用相同的 consumer.group.instance.id 进行初始化?我需要每个任务(==consumer)都有一个唯一的 consumer.group.instance.id以上是关于Kafka Connect 中的静态成员资格的主要内容,如果未能解决你的问题,请参考以下文章