Kafka入门系列(十三) 如何查看topic消息数
Posted 大数据Kafka技术分享
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka入门系列(十三) 如何查看topic消息数相关的知识,希望对你有一定的参考价值。
相信很多人都问过这样的问题:我如何知道某个topic到底生产了多少条消息呢?其实Kafka官方提供了脚本用于查询,只是该脚本的使用方法没有记录在官方的文档中。另外我们也可以自己写程序调用client API来自行查询。下面我们就分别看看如何使用这两种方式来查询topic消息数。
GetOffsetShell脚本方式
虽然kafka.tools.GetOffsetShell工具的使用方法没有记录在官方文档中,但使用它却非常简单。假设我也查询名为`test`的topic的消息数,具体命令如下:
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic test --time -1
-1表示查询test各个分区当前最大的消息位移值(注意,这里的位移不只是consumer端的位移,而是指消息在每个分区的位置)
如果你要查询曾经生产过的最大消息数,那么只运行上面这条命令然后把各个分区的结果相加就可以了。但如果你需要查询当前集群中该topic的消息数,那么还需要运行下面这条命令:
kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list localhost:9092 --topic test --time -2
-2表示去获取当前各个分区的最小位移。之后把运行第一条命令的结果与刚刚获取的位移之和相减就是集群中该topic的当前消息总数。
Java API方式
写程序实现这个也不难,主要是借助KafkaConsumer的帮助,具体代码如下:
String brokerUrls = "localhost:9092";
String topic = "test";
int numPartitions = 6;
Properties props = new Properties();
props.put("bootstrap.servers", brokerUrls);
props.put("group.id", "test-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList(topic));
List<TopicPartition> partitions = new ArrayList<>();
for (int i = 0; i < numPartitions; ++i) {
partitions.add(new TopicPartition(topic, i));
}
Map<TopicPartition, Long> offsets = consumer.endOffsets(partitions);
Map<TopicPartition, Long> earlyOffsets = consumer.beginningOffsets(partitions);
int totalCount = 0;
for (TopicPartition tp: offsets.keySet()) {
totalCount += (offsets.get(tp) - earlyOffsets.get(tp));
}
System.out.println("Total message count: " + totalCount);
consumer.close();
上面的代码非常简单,就是分别调用endOffsets和beginningOffsets去获取各个分区当前对应的最大位移和最小位移。其实和脚本方式是一致的。
以上是关于Kafka入门系列(十三) 如何查看topic消息数的主要内容,如果未能解决你的问题,请参考以下文章