Kafka 日志中缺少偏移量 - 简单消费者无法继续

Posted

技术标签:

【中文标题】Kafka 日志中缺少偏移量 - 简单消费者无法继续【英文标题】:Offset missing from Kafka logs - Simple Consumer unable to proceed 【发布时间】:2013-10-06 08:47:14 【问题描述】:

我有一个 3 节点的 kafka 集群设置。我正在使用storm来阅读来自kafka的消息。我系统中的每个主题都有 7 个分区。

现在我面临一个奇怪的问题。直到 3 天前,一切正常。但是,现在看来我的风暴拓扑无法专门从 2 个分区 - #1 和 #4 读取。

我试图深入研究这个问题,发现在我的 kafka 日志中,对于这两个分区,都缺少一个偏移量,即在 5964511 之后,下一个偏移量是 5964513 而不是 5964512。

由于缺少偏移量,简单消费者无法继续进行下一个偏移量。我做错了什么还是已知的错误?

这种行为可能是什么原因?

我正在使用以下代码来读取有效偏移量的窗口:

public static long getLastOffset(SimpleConsumer consumer, String topic, int partition,
                                 long whichTime, String clientName) 
    TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
    Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfoMap = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
    requestInfoMap.put(topicAndPartition, new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.LatestTime(), 100));
    OffsetRequest request = new OffsetRequest( requestInfoMap, kafka.api.OffsetRequest.CurrentVersion() , clientName);
    OffsetResponse response = consumer.getOffsetsBefore(request);
    long[] validOffsets = response.offsets(topic, partition);
    for (long validOffset : validOffsets) 
        System.out.println(validOffset + " : ");
    
    long largestOffset = validOffsets[0];
    long smallestOffset = validOffsets[validOffsets.length - 1];
    System.out.println(smallestOffset + " : " + largestOffset );
    return largestOffset;

这给了我以下输出:

4529948 : 6000878

所以,我提供的偏移量正好在偏移范围内。

【问题讨论】:

【参考方案1】:

抱歉回复晚了,但是...

我通过使用 Long 实例 var 来保存要读取的下一个偏移量,然后在 fetch 之后检查返回的 FetchResponse 是否有错误()来为这种情况编写代码。如果出现错误,我将下一个偏移值更改为一个合理的值(可能是下一个偏移量或最后一个可用的偏移量)并重试。

【讨论】:

以上是关于Kafka 日志中缺少偏移量 - 简单消费者无法继续的主要内容,如果未能解决你的问题,请参考以下文章

一探究竟,详解Kafka生产者和消费者的工作原理!

Kafka手动提交偏移量的作用到底是什么???

如何获取 kafka 主题分区的最新偏移量?

Kafka消费者偏移量

Kafka Connect - 无法提交偏移量和刷新

Kafka 入门篇