获取kafka中的偏移
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了获取kafka中的偏移相关的知识,希望对你有一定的参考价值。
kafka0.10.0.0的版本:
for (i <- 0 to (offsetList.length - 1)) {
var topicPartition = new TopicPartition(topic, i)
topicArray.add(topicPartition)
kafkaConsumer.assign(util.Arrays.asList(topicPartition))
kafkaConsumer.seekToEnd(util.Arrays.asList(topicPartition))
val latestOffset = kafkaConsumer.position(topicPartition)
logInfo("partition"+i+"latestOffset"+latestOffset)
endOffsetMap.put(topicPartition, latestOffset)
}
kafka:0.11.0.0版本:
def test() = {
var endOffsetMap1 = kafkaConsumer.endOffsets(topicArray)
import scala.collection.JavaConversions._
for (entry <- endOffsetMap) {
var key: TopicPartition = entry._1
logInfo("test -----topic:"+this.topic +" partition:" + key.partition() +" endOffset:" + entry._2)
}
}
以上是关于获取kafka中的偏移的主要内容,如果未能解决你的问题,请参考以下文章
如何获取 Kafka 偏移量以进行结构化查询以进行手动可靠的偏移量管理?
如何在从 Spark 消费 Kafka 时获取偏移 id,将其保存在 Cassandra 中并使用它来重新启动 Kafka?