Consumer.endOffsets 如何在 Kafka 中工作?
Posted
技术标签:
【中文标题】Consumer.endOffsets 如何在 Kafka 中工作?【英文标题】:How does Consumer.endOffsets work in Kafka? 【发布时间】:2019-03-22 16:59:15 【问题描述】:假设我有一个无限期运行的计时器任务,它遍历 kafka 集群中的所有消费者组,并为每个组的所有分区输出延迟、提交的偏移量和结束偏移量。与 Kafka 控制台消费者组脚本的工作方式类似,但它适用于所有组。
类似
单一消费者 - 不工作 - 不返回某些提供的主题分区的偏移量(例如提供 10 个 - 返回 5 个偏移量)
Consumer consumer;
static
consumer = createConsumer();
run()
List<String> groupIds = getConsumerGroups();
for(String groupId: groupIds)
List<TopicParition> topicParitions = getTopicParitions(groupId);
consumer.endOffsets(topicParitions); -- Not working - missing offsets for some partitions for some groups (in 10 - out 5)
多个消费者 - 工作
run()
List<String> groupIds = getConsumerGroups();
for(String groupId: groupIds)
List<TopicParition> topicParitions = getTopicParitions(groupId);
Consumer consumer = createConsumer();
consumer.endOffsets(topicParitions); This works!!!
版本:Kafka-Client 2.0.0
我是否错误地使用了消费者 API?理想情况下,我想使用单一消费者。
如果您需要更多详细信息,请告诉我。
【问题讨论】:
【参考方案1】:我想你快到了。首先收集所有您感兴趣的主题分区,然后发出consumer.endOffsets
命令。
请记住,我没有尝试运行它,但这样的事情应该可以工作:
run()
Consumer consumer = createConsumer();
List<String> groupIds = getConsumerGroups();
List<TopicPartition> topicPartitions = new ArrayList<>();
for (String groupId: groupIds)
topicPartitions.addAll(getTopicPartitions(groupId));
consumer.endOffsets(topicPartitions);
【讨论】:
谢谢。当我使用单个消费者时,您为什么认为 consumer.endOffsets 不会返回所有分区的偏移量,如第一个代码 sn-p 所示?【参考方案2】:这是Fetcher.fetchOffsetsByTimes()
中的一个错误,特别是在groupListOffsetRequests
方法中,其中逻辑没有添加用于重试的分区,其中请求分区偏移量的领导者未知或不可用。
当您在所有消费者组分区中使用单个消费者时,当我们请求endoffsets
时某些组已经具有主题分区领导者信息并且对于没有领导者信息未知或不可用的主题分区,这会更加明显由于错误而停止。
后来,我意识到从每个消费者组中提取主题分区并不是一个好主意,而是更改为从 AdminClient.listTopics & AdminClient.describeTopics
读取主题分区并将所有内容一次性传递给 Consumer.endOffsets
。
虽然这不能完全解决问题,因为主题/分区在多次运行之间可能仍然不可用或未知。
可以找到更多信息 - KAFKA-7044
和 pull request
。这已修复并计划在 2.1.0 版本中发布。
【讨论】:
以上是关于Consumer.endOffsets 如何在 Kafka 中工作?的主要内容,如果未能解决你的问题,请参考以下文章