flink kafka消费者组ID不起作用

Posted

技术标签:

【中文标题】flink kafka消费者组ID不起作用【英文标题】:flink kafka consumer groupId not working 【发布时间】:2016-12-03 00:03:25 【问题描述】:

我正在使用带有 flink 的 kafka。 在一个简单的程序中,我使用了 flinks FlinkKafkaConsumer09,并为其分配了 group id。

根据 Kafka 的行为,当我在具有相同 group.Id 的同一主题上运行 2 个消费者时,它应该像消息队列一样工作。我认为它应该像这样工作: 如果向 Kafka 发送 2 条消息,则每个或一个 flink 程序将总共处理 2 条消息两次(假设总共 2 行输出)。

但实际结果是,每个程序会收到2条消息。

我尝试使用随 kafka 服务器下载一起提供的消费者客户端。它以记录的方式工作(处理了 2 条消息)。 我尝试在 flink 程序的同一个 Main 函数中使用 2 个 kafka 消费者。总共处理了 4 条消息。 我还尝试运行 2 个 flink 实例,并为每个实例分配相同的 kafka consumer 程序。 4 条消息。

有什么想法吗? 这是我期望的输出:

1> Kafka and Flink2 says: element-65  
2> Kafka and Flink1 says: element-66 

这是我总是得到错误的输出:

1> Kafka and Flink2 says: element-65  
1> Kafka and Flink1 says: element-65  
2> Kafka and Flink2 says: element-66  
2> Kafka and Flink1 says: element-66 

下面是这段代码:

public static void main(String[] args) throws Exception 

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    ParameterTool parameterTool = ParameterTool.fromArgs(args);

    DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer09<>(parameterTool.getRequired("topic"), new SimpleStringSchema(), parameterTool.getProperties()));

    messageStream.rebalance().map(new MapFunction<String, String>() 
        private static final long serialVersionUID = -6867736771747690202L;

        @Override
        public String map(String value) throws Exception 
            return "Kafka and Flink1 says: " + value;
        
    ).print();


    env.execute();

我曾尝试运行它两次,也以另一种方式运行: 在 Main 函数中为每个数据流创建 2 个数据流和 env.execute()。

【问题讨论】:

我也试过了,在 flink 外部使用 kafka 客户端运行 2 个消费者,在 flink 实例中使用 flink-kafka-connector 运行 2 个消费者。外面的2个消费者似乎工作正常,总共2个。但是 flink 内部的另外 2 个似乎是独立工作的(对外部,也对彼此),它们每个收到 2 条消息,所以总共 4 条。 【参考方案1】:

今天在 Flink 用户邮件列表上有一个非常相似的问题,但是我在这里找不到发布它的链接。所以这里有一部分答案:

“在内部,Flink Kafka 连接器不使用消费者组 管理功能,因为他们使用较低级别的 API (0.8 中的 SimpleConsumer 和 0.9 中的 KafkaConsumer#assign(...)) 并行实例以更好地控制单个分区 消费。所以,本质上,Flink 中的“group.id”设置 Kafka 连接器仅用于将偏移提交回 ZK / Kafka 经纪人。”

也许这可以为您澄清事情。

此外,还有一篇关于使用 Flink 和 Kafka 的博客文章可能会对您有所帮助 (https://data-artisans.com/blog/kafka-flink-a-practical-how-to)。

【讨论】:

非常感谢。我终于检查了 flink 连接器源代码。它确实使用了不同于常规 kafka 客户端的另一种消息处理途径。 顺便说一句,找到了邮件列表问题的链接:apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/…【参考方案2】:

由于flink kafka消费者的group.id除了向zookeeper提交offset之外没有太多用处。就 flink kafka 消费者而言,是否有任何偏移量监控方式。我可以看到有一种方法 [在 consumer-groups/consumer-offset-checker 的帮助下] 适用于控制台消费者,但不适用于 flink kafka 消费者。

我们想看看我们的 flink kafka 消费者如何落后/落后于 kafka 主题大小[给定时间点主题中的消息总数],可以在分区级别进行。

【讨论】:

这是答案还是问题?

以上是关于flink kafka消费者组ID不起作用的主要内容,如果未能解决你的问题,请参考以下文章

春季云流-消费者配置不起作用

Mat到Byte []的转换在java中不起作用

Spring stomp over websocket SubscribeMapping 不起作用

为啥通知组在android中不起作用?我做错了啥?

s-s-rS 订阅电子邮件组(分发列表)不起作用

将 Kafka 连接嵌入 Ksqldb-server 时挂载(卷)不起作用