Kafka 日志中缺少偏移量 - 简单消费者无法继续
Posted
技术标签:
【中文标题】Kafka 日志中缺少偏移量 - 简单消费者无法继续【英文标题】:Offset missing from Kafka logs - Simple Consumer unable to proceed 【发布时间】:2013-10-06 08:47:14 【问题描述】:我有一个 3 节点的 kafka 集群设置。我正在使用storm来阅读来自kafka的消息。我系统中的每个主题都有 7 个分区。
现在我面临一个奇怪的问题。直到 3 天前,一切正常。但是,现在看来我的风暴拓扑无法专门从 2 个分区 - #1 和 #4 读取。
我试图深入研究这个问题,发现在我的 kafka 日志中,对于这两个分区,都缺少一个偏移量,即在 5964511 之后,下一个偏移量是 5964513 而不是 5964512。
由于缺少偏移量,简单消费者无法继续进行下一个偏移量。我做错了什么还是已知的错误?
这种行为可能是什么原因?
我正在使用以下代码来读取有效偏移量的窗口:
public static long getLastOffset(SimpleConsumer consumer, String topic, int partition,
long whichTime, String clientName)
TopicAndPartition topicAndPartition = new TopicAndPartition(topic, partition);
Map<TopicAndPartition, PartitionOffsetRequestInfo> requestInfoMap = new HashMap<TopicAndPartition, PartitionOffsetRequestInfo>();
requestInfoMap.put(topicAndPartition, new PartitionOffsetRequestInfo(kafka.api.OffsetRequest.LatestTime(), 100));
OffsetRequest request = new OffsetRequest( requestInfoMap, kafka.api.OffsetRequest.CurrentVersion() , clientName);
OffsetResponse response = consumer.getOffsetsBefore(request);
long[] validOffsets = response.offsets(topic, partition);
for (long validOffset : validOffsets)
System.out.println(validOffset + " : ");
long largestOffset = validOffsets[0];
long smallestOffset = validOffsets[validOffsets.length - 1];
System.out.println(smallestOffset + " : " + largestOffset );
return largestOffset;
这给了我以下输出:
4529948 : 6000878
所以,我提供的偏移量正好在偏移范围内。
【问题讨论】:
【参考方案1】:抱歉回复晚了,但是...
我通过使用 Long 实例 var 来保存要读取的下一个偏移量,然后在 fetch 之后检查返回的 FetchResponse 是否有错误()来为这种情况编写代码。如果出现错误,我将下一个偏移值更改为一个合理的值(可能是下一个偏移量或最后一个可用的偏移量)并重试。
【讨论】:
以上是关于Kafka 日志中缺少偏移量 - 简单消费者无法继续的主要内容,如果未能解决你的问题,请参考以下文章