Kafka Connect 中的静态成员资格

Posted

技术标签:

【中文标题】Kafka Connect 中的静态成员资格【英文标题】:Static membership in Kafka Connect 【发布时间】:2021-08-25 06:35:38 【问题描述】:

我正在尝试在 Kafka Connect 中实现静态成员资格。 我们的 Kafka Connect 集群使用 Strimzi Kafka 算子部署在 K8S 上。

我已尝试为工作人员(在 KafkaConnect yaml 中)放置以下配置:

connector.client.config.override.policy: All
consumer.group.instance.id: somethingsomething

在 HttpSinkConnector 类中我有这个:

@Override
public List<Map<String, String>> taskConfigs(int maxTasks) 
    List<Map<String, String>> configs = new ArrayList<>(maxTasks);
    for (int i = 0; i < maxTasks; i++) 
        Map<String, String> configCopy = new HashMap<>(this.configProps);
        configCopy.put("consumer.override.group.instance.id", Thread.currentThread().getName());

        configs.add(configCopy);
    
    return configs;

这给了 org.apache.kafka.common.errors.FencedInstanceIdException 一些日志 - ...07:07:32,631 错误 [Consumer instanceId=somethingsomething... 因为所有任务都得到了一些东西作为他们的 group.instance.id 虽然它应该有 Thread.currentThread().getName()。

我也尝试了以下方法(没有工人配置):

@Override
public List<Map<String, String>> taskConfigs(int maxTasks) 
    List<Map<String, String>> configs = new ArrayList<>(maxTasks);
    for (int i = 0; i < maxTasks; i++) 
        Map<String, String> configCopy = new HashMap<>(this.configProps);
        configCopy.put("consumer.group.instance.id", "somethingsomething");

        configs.add(configCopy);
    
    return configs;

这什么也没做(没有错误,日志中没有 instanceId),这意味着我把这个配置值放在了错误的地方。

那么如何在 Kafka Connect 上实现静态成员资格?

【问题讨论】:

【参考方案1】:

使用 consumer.override.group.instance.id 而不是 consumer.group.instance.id

【讨论】:

这是问题中描述的第一次尝试... 我相信你指的是connector.client.config.override.policy: All ,我假设你必须在kafka connect配置中进行配置。那是正确的。我指的是在您的连接器配置中使用 consumer.override.group.instance.id。而不是这个 => consumer.group.instance.id: somethingsomething 使用 => consumer.override.group.instance.id: somethingsomething 不会按照您建议的方式配置连接器,这将使所有任务都使用相同的 consumer.group.instance.id 进行初始化?我需要每个任务(==consumer)都有一个唯一的 consumer.group.instance.id

以上是关于Kafka Connect 中的静态成员资格的主要内容,如果未能解决你的问题,请参考以下文章

c# 静态成员和实例成员的区别

静态成员

java里的静态成员变量是放在了堆内存还是栈内

Java中static关键字和代码块的学习

mfc 类静态成员

静态方法和实例方法