Java API获取非compacted topic总消息数
Posted huxi_2b
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java API获取非compacted topic总消息数相关的知识,希望对你有一定的参考价值。
目前Kafka并没有提供直接的工具来帮助我们获取某个topic的当前总消息数,需要我们自行写程序来实现。下列代码可以实现这一功能,特此记录一下:
/** * 获取某个topic的当前消息数 * Java 8+ only * * @param topic * @param brokerList * @return */ public static long totalMessageCount(String topic, String brokerList) { Properties props = new Properties(); props.put("bootstrap.servers", brokerList); props.put("group.id", "test-group"); props.put("enable.auto.commit", "false"); props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer"); try (KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props)) { List<TopicPartition> tps = Optional.ofNullable(consumer.partitionsFor(topic)) .orElse(Collections.emptyList()) .stream() .map(info -> new TopicPartition(info.topic(), info.partition())) .collect(Collectors.toList()); Map<TopicPartition, Long> beginOffsets = consumer.beginningOffsets(tps); Map<TopicPartition, Long> endOffsets = consumer.endOffsets(tps); return tps.stream().mapToLong(tp -> endOffsets.get(tp) - beginOffsets.get(tp)).sum(); } }
以上是关于Java API获取非compacted topic总消息数的主要内容,如果未能解决你的问题,请参考以下文章
SpaceMouse Compact 无法在 Ubuntu 上的 Chrome 中使用 js Gamepad api
Apache Kafka:使用java方式操作消费组和重置分区偏移量(admin api)
从 SQL Server Compact 数据库获取实时数据