不同主题使用相同的偏移值
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了不同主题使用相同的偏移值相关的知识,希望对你有一定的参考价值。
我们的拓扑使用KafkaSpout
从kafka主题获取消息。我们有大约150个主题,包括12个分区,8个风暴执行器和2个风暴节点上的任务。 Storm版本1.0.5,Kafka经纪人版本10.0.2,Kafka客户端版本0.9.0.1。我们不会删除Kafka主题。
在某个时刻,我在worker.log中观察到大量重复的WARN消息
2018-05-29 14:36:57.928 oaskKafkaUtils Thread-15-kafka-spout-executor [18 18] [WARN]分区{host1:9092,topic = topic_1,partition = 10}获取偏移超出范围的获取请求:[9248]
2018-05-29 14:36:57.929 oaskKafkaUtils Thread-23-kafka-spout-executor [16 16] [WARN]分区{host = host2:9092,topic = topic_2,partition = 0}获取偏移量的提取请求范围:[22650006]
2018-05-29 14:36:57.930 oaskKafkaUtils Thread-23-kafka-spout-executor [16 16] [WARN]分区{host = host3:9092,topic = topic_3,partition = 4}获取偏移量的提取请求范围:[1011584]
2018-05-29 14:36:57.932 oaskKafkaUtils Thread-7-kafka-spout-executor [12 12] [WARN]分区{host1:9092,topic = topic4,partition = 4}获取偏移超出范围的获取请求:[9266]
2018-05-29 14:36:57.933 oaskKafkaUtils Thread-7-kafka-spout-executor [12 12] [WARN]分区{host = host2:9092,topic = topic5,partition = 4}获取偏移量的提取请求范围:[9266]
2018-05-29 14:36:57.935 oaskKafkaUtils Thread-23-kafka-spout-executor [16 16] [WARN]分区{host1:9092,topic = topic6,partition = 4}获取偏移超出范围的获取请求:[1011584]
2018-05-29 14:36:57.936 oaskKafkaUtils Thread-15-kafka-spout-executor [18 18] [WARN]分区{host = host2:9092,topic = topic6,partition = 10}获取偏移量的提取请求范围:[9248]
由于某种原因,相同的常量偏移值用于不同主题的相同分区。
我启用了DEBUG模式并更精确地观察了日志文件。
2018-05-29 14:37:03.573 oaskPartitionManager Thread-7-kafka-spout-executor [12 12] [DEBUG]写了最后完成的偏移量(1572936)到ZK for Partition {host = host3:9092,topic = topic1, partition = 8}用于拓扑:topology1
2018-05-29 14:37:03.577 oaskPartitionManager Thread-7-kafka-spout-executor [12 12] [DEBUG]将最后完成的偏移量(1572936)写入ZK for Partition {host = host1:9092,topic = topic2, partition = 8}用于拓扑:topology1
2018-05-29 14:37:03.578 oaskPartitionManager Thread-7-kafka-spout-executor [12 12] [DEBUG]将最后完成的偏移量(1572936)写入ZK for Partition {host = host2:9092,topic = topic3, partition = 8}用于拓扑:topology1
2018-05-29 14:38:07.581 oaskPartitionManager Thread-23-kafka-spout-executor [16 16] [DEBUG]写了最后完成的偏移量(61292573)到ZK for Partition {host = host1:9092,topic = topic4, partition = 8}用于拓扑:topology1
2018-05-29 14:38:07.582 oaskPartitionManager Thread-23-kafka-spout-executor [16 16] [DEBUG]写了最后完成的偏移量(61292573)到ZK for Partition {host = host2:9092,topic = topic5, partition = 8}用于拓扑:topology1
2018-05-29 14:38:07.584 oaskPartitionManager Thread-23-kafka-spout-executor [16 16] [DEBUG]将最后完成的偏移(61292573)写入ZK for Partition {host = host3:9092,topic = topic6, partition = 8}用于拓扑:topology1
我注意到所有主题的一部分被分成两个独立的组。每个小组由31个主题组成。每个组中的所有主题都为每个分区使用相同的偏移值。然而,该值不是恒定的,并且在8个不同的值之间变化。这8个值中的每一个对于组中的特定主题都是正确的。此外,这些值中的每一个都在不断增长,并且所有主题都同步更新。来自每个组的大多数主题(来自62的55个)具有相应的“偏移范围或范围”警告消息,但具有恒定值。其他7个主题在没有WARNING消息的情况下继续正常工作,但其偏移值也在发生变化。
我查看了storm-kafka
的源代码并注意到useStartOffsetTimeIfOffsetOutOfRange
标志在我们的情况下不起作用,因为我们没有失败的元组和kafka偏移量小于_emittedToOffset
。因此,一次又一次地记录相同的WARN消息。
} catch (TopicOffsetOutOfRangeException e) {
offset = KafkaUtils.getOffset(_consumer, _partition.topic, _partition.partition, kafka.api.OffsetRequest.EarliestTime());
// fetch failed, so don't update the fetch metrics
//fix bug [STORM-643] : remove outdated failed offsets
if (!processingNewTuples) {
// For the case of EarliestTime it would be better to discard
// all the failed offsets, that are earlier than actual EarliestTime
// offset, since they are anyway not there.
// These calls to broker API will be then saved.
Set<Long> omitted = this._failedMsgRetryManager.clearOffsetsBefore(offset);
// Omitted messages have not been acked and may be lost
if (null != omitted) {
_lostMessageCount.incrBy(omitted.size());
}
_pending.headMap(offset).clear();
LOG.warn("Removing the failed offsets for {} that are out of range: {}", _partition, omitted);
}
if (offset > _emittedToOffset) {
_lostMessageCount.incrBy(offset - _emittedToOffset);
_emittedToOffset = offset;
LOG.warn("{} Using new offset: {}", _partition, _emittedToOffset);
}
return;
}
但是我不明白_emittedToOffset
如何才能获得不同主题的相同价值。你可能有任何想法为什么会发生这种情况?
在Kafka经纪人失败时,storm-kafka源代码中存在一个错误。这里有相应的JIRA票和pull request修复。
以上是关于不同主题使用相同的偏移值的主要内容,如果未能解决你的问题,请参考以下文章