flink kafka消费者组ID不起作用
Posted
技术标签:
【中文标题】flink kafka消费者组ID不起作用【英文标题】:flink kafka consumer groupId not working 【发布时间】:2016-12-03 00:03:25 【问题描述】:我正在使用带有 flink 的 kafka。 在一个简单的程序中,我使用了 flinks FlinkKafkaConsumer09,并为其分配了 group id。
根据 Kafka 的行为,当我在具有相同 group.Id 的同一主题上运行 2 个消费者时,它应该像消息队列一样工作。我认为它应该像这样工作: 如果向 Kafka 发送 2 条消息,则每个或一个 flink 程序将总共处理 2 条消息两次(假设总共 2 行输出)。
但实际结果是,每个程序会收到2条消息。
我尝试使用随 kafka 服务器下载一起提供的消费者客户端。它以记录的方式工作(处理了 2 条消息)。 我尝试在 flink 程序的同一个 Main 函数中使用 2 个 kafka 消费者。总共处理了 4 条消息。 我还尝试运行 2 个 flink 实例,并为每个实例分配相同的 kafka consumer 程序。 4 条消息。
有什么想法吗? 这是我期望的输出:
1> Kafka and Flink2 says: element-65
2> Kafka and Flink1 says: element-66
这是我总是得到错误的输出:
1> Kafka and Flink2 says: element-65
1> Kafka and Flink1 says: element-65
2> Kafka and Flink2 says: element-66
2> Kafka and Flink1 says: element-66
下面是这段代码:
public static void main(String[] args) throws Exception
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
ParameterTool parameterTool = ParameterTool.fromArgs(args);
DataStream<String> messageStream = env.addSource(new FlinkKafkaConsumer09<>(parameterTool.getRequired("topic"), new SimpleStringSchema(), parameterTool.getProperties()));
messageStream.rebalance().map(new MapFunction<String, String>()
private static final long serialVersionUID = -6867736771747690202L;
@Override
public String map(String value) throws Exception
return "Kafka and Flink1 says: " + value;
).print();
env.execute();
我曾尝试运行它两次,也以另一种方式运行: 在 Main 函数中为每个数据流创建 2 个数据流和 env.execute()。
【问题讨论】:
我也试过了,在 flink 外部使用 kafka 客户端运行 2 个消费者,在 flink 实例中使用 flink-kafka-connector 运行 2 个消费者。外面的2个消费者似乎工作正常,总共2个。但是 flink 内部的另外 2 个似乎是独立工作的(对外部,也对彼此),它们每个收到 2 条消息,所以总共 4 条。 【参考方案1】:今天在 Flink 用户邮件列表上有一个非常相似的问题,但是我在这里找不到发布它的链接。所以这里有一部分答案:
“在内部,Flink Kafka 连接器不使用消费者组 管理功能,因为他们使用较低级别的 API (0.8 中的 SimpleConsumer 和 0.9 中的 KafkaConsumer#assign(...)) 并行实例以更好地控制单个分区 消费。所以,本质上,Flink 中的“group.id”设置 Kafka 连接器仅用于将偏移提交回 ZK / Kafka 经纪人。”
也许这可以为您澄清事情。
此外,还有一篇关于使用 Flink 和 Kafka 的博客文章可能会对您有所帮助 (https://data-artisans.com/blog/kafka-flink-a-practical-how-to)。
【讨论】:
非常感谢。我终于检查了 flink 连接器源代码。它确实使用了不同于常规 kafka 客户端的另一种消息处理途径。 顺便说一句,找到了邮件列表问题的链接:apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/…【参考方案2】:由于flink kafka消费者的group.id除了向zookeeper提交offset之外没有太多用处。就 flink kafka 消费者而言,是否有任何偏移量监控方式。我可以看到有一种方法 [在 consumer-groups/consumer-offset-checker 的帮助下] 适用于控制台消费者,但不适用于 flink kafka 消费者。
我们想看看我们的 flink kafka 消费者如何落后/落后于 kafka 主题大小[给定时间点主题中的消息总数],可以在分区级别进行。
【讨论】:
这是答案还是问题?以上是关于flink kafka消费者组ID不起作用的主要内容,如果未能解决你的问题,请参考以下文章