查询订阅某topic的所有consumer group(Java API)

Posted huxi_2b

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了查询订阅某topic的所有consumer group(Java API)相关的知识,希望对你有一定的参考价值。

在网上碰到的问题,想了下使用现有的API还是可以实现的。

首先,需要引入Kafka服务器端代码,比如加入Kafka 1.0.0依赖:

Maven

<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>1.0.0</version>
</dependency>

Gradle

compile group: ‘org.apache.kafka‘, name: ‘kafka_2.12‘, version: ‘1.0.0‘

然后编写获取订阅某topic的所有group的方法,代码如下:

/**
     * get all subscribing consumer group names for a given topic
     * @param brokerListUrl localhost:9092 for instance
     * @param topic         topic name
     * @return
     */
    public static Set<String> getAllGroupsForTopic(String brokerListUrl, String topic) {
        AdminClient client = AdminClient.createSimplePlaintext(brokerListUrl);

        try {
            List<GroupOverview> allGroups = scala.collection.JavaConversions.seqAsJavaList(client.listAllGroupsFlattened().toSeq());
            Set<String> groups = new HashSet<>();
            for (GroupOverview overview: allGroups) {
                String groupID = overview.groupId();
                Map<TopicPartition, Object> offsets = scala.collection.JavaConversions.mapAsJavaMap(client.listGroupOffsets(groupID));
                Set<TopicPartition> partitions = offsets.keySet();
                for (TopicPartition tp: partitions) {
                    if (tp.topic().equals(topic)) {
                        groups.add(groupID);
                    }
                }
            }
            return groups;
        } finally {
            client.close();
        }
    }  




以上是关于查询订阅某topic的所有consumer group(Java API)的主要内容,如果未能解决你的问题,请参考以下文章

redis消息模式事务慢日志查询备份恢复

在 Context.Consumer 创建的 useEffect 清理函数中取消所有订阅

Kafka Rebalance机制分析

php 从rabbitmq consume 和 get的区别

Kafka 模式

Kafka Consumer(消费者组)