Kafka 0.10.0.1 consumer get earliest partition offset from Kafka broker cluster - scala code

Posted yjyyjy

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka 0.10.0.1 consumer get earliest partition offset from Kafka broker cluster - scala code相关的知识,希望对你有一定的参考价值。

Return: Map[TopicPartition, Long] 

Code:

val props = new Properties()
props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaPara("bootstrap.servers").toString)
props.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaPara("group.id").toString)
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringDeserializer")

val kc: KafkaConsumer[String, String] = new KafkaConsumer[String, String](props)

kc.partitionsFor(new String(topic)).asScala.map{partitionInfo =>

val topicPartition = new TopicPartition(topic, partitionInfo.partition())
kc.assign(Seq(topicPartition).asJava)
kc.seekToBeginning(Seq(topicPartition).asJava)
topicPartition ->  kc.position(topicPartition)
}.toMap

Key point: Scala code call Java lib

以上是关于Kafka 0.10.0.1 consumer get earliest partition offset from Kafka broker cluster - scala code的主要内容,如果未能解决你的问题,请参考以下文章

Kafka的Topic的partitions数目设置最佳实践

kafka consumer重新连接后如何获取当前最新数据

kafka consumer offset机制

windows下kafka源码阅读环境搭建

kafka的consumer.properties的group.id到底有啥用

如何创建kafka consumer