如何使用 JAVA API 从 Kafka 获取每个主题的消息数量 [重复]

Posted

技术标签:

【中文标题】如何使用 JAVA API 从 Kafka 获取每个主题的消息数量 [重复]【英文标题】:How to get Message no of Messages per topic From Kafka using JAVA API [duplicate] 【发布时间】:2016-12-12 11:31:01 【问题描述】:

我想知道如何通过 java api 获取 kafka 中每个主题的消息数,我不知道想使用下一篇文章中提到的命令行工具。知道怎么做吗?

PS:我不想循环通过 KAFKA 消费者流来计算计数,我试图在开始时计算这个计数(在从 Kafka 消费之前)

Java, How to get number of messages in a topic in apache kafka

【问题讨论】:

【参考方案1】:

使用新的KafkaConsumer,您可以使用seekToBeginning(...)seekToEnd(...) 并计算每个分区的最大和最小偏移量的差值并将这些数字相加。

如果你寻找,你就不会消费消息。请记住,搜索是惰性的,即您需要使用 position(...) 来实际触发搜索。由于懒惰,两种搜索方法都不会返回任何内容。但是,position(...) 会给出可用于计算的偏移量。

见http://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/clients/consumer/KafkaConsumer.html

【讨论】:

这很好用,除了 MapR 版本的 Kafka (MapR Streams)。偏移量是记录开始的实际字节偏移量,而不是原始 Kafka 中使用的“增量一”偏移量......所以取最早和最新之间的差异只会告诉你消息的总和(而不是即便如此,真的)使用 mapR。 这个问题没有说任何关于使用 MapR Streams 的内容(AFAIK 不是 Kafka 的 MapR 版本,而是一个新系统——仅兼容 API)。 你说得对。问题是关于“真正的”卡夫卡,这就是为什么 +1

以上是关于如何使用 JAVA API 从 Kafka 获取每个主题的消息数量 [重复]的主要内容,如果未能解决你的问题,请参考以下文章

java工程kafka传递自定义对象,消费端获取到的是null

如何获取Kafka的消费者详情——从Scala到Java的切换

从Java API创建Kafka主题[重复]

如何从 Kafka JSON 消息中获取 org.apache.kafka.connect.data.Decimal 值 [重复]

学习使用哪个 Kafka API 以将传统集成系统转换为 Apache Kafka

kafka通过java api 获取当前消费组offset/logsize/lag信息,实现消费延迟监控