获取kafka中的偏移

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了获取kafka中的偏移相关的知识,希望对你有一定的参考价值。

kafka0.10.0.0的版本:
for (i <- 0 to (offsetList.length - 1)) {
var topicPartition = new TopicPartition(topic, i)
topicArray.add(topicPartition)

kafkaConsumer.assign(util.Arrays.asList(topicPartition))
kafkaConsumer.seekToEnd(util.Arrays.asList(topicPartition))
val latestOffset = kafkaConsumer.position(topicPartition)
logInfo("partition"+i+"latestOffset"+latestOffset)
endOffsetMap.put(topicPartition, latestOffset)
}

kafka:0.11.0.0版本:
def test() = {
var endOffsetMap1 = kafkaConsumer.endOffsets(topicArray)
import scala.collection.JavaConversions._
for (entry <- endOffsetMap) {
var key: TopicPartition = entry._1
logInfo("test -----topic:"+this.topic +" partition:" + key.partition() +" endOffset:" + entry._2)
}

}

以上是关于获取kafka中的偏移的主要内容,如果未能解决你的问题,请参考以下文章

获取主题和分区偏移量

如何获取 kafka 主题分区的最新偏移量?

如何获取 kafka 主题分区的最后/结束偏移量?

如何获取 Kafka 偏移量以进行结构化查询以进行手动可靠的偏移量管理?

如何在从 Spark 消费 Kafka 时获取偏移 id,将其保存在 Cassandra 中并使用它来重新启动 Kafka?

kafka根据offset查找消息流程