Kafka获取订阅某topic的所有consumer group客户端版
Posted huxi_2b
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kafka获取订阅某topic的所有consumer group客户端版相关的知识,希望对你有一定的参考价值。
之前写过如何用服务器端的API代码来获取订阅某topic的所有consumer group,参见这里。使用服务器端的API需要用到kafka.admin.AdminClient类,但是这个类在0.11.0.0版本已经被标记为不推荐使用了,故目前最合适的方式还是通过客户端API:org.apache.kafka.clients.admin.AdminClient。今天碰到有人问这个问题,我就尝试写了一个。使用之前你需要引入kafka client包依赖(以2.2.0版本为例)
Maven:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.2.0</version>
</dependency>
Gradle:
compile group: \'org.apache.kafka\', name: \'kafka-clients\', version: \'2.2.0\'
下面是代码:
1 private static List<String> getGroupsForTopic(String brokerServers, String topic)
2 throws ExecutionException, InterruptedException, TimeoutException {
3 Properties props = new Properties();
4 props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, brokerServers);
5
6 try (AdminClient client = AdminClient.create(props)) {
7 List<String> allGroups = client.listConsumerGroups()
8 .valid()
9 .get(10, TimeUnit.SECONDS)
10 .stream()
11 .map(ConsumerGroupListing::groupId)
12 .collect(Collectors.toList());
13
14 Map<String, ConsumerGroupDescription> allGroupDetails =
15 client.describeConsumerGroups(allGroups).all().get(10, TimeUnit.SECONDS);
16
17 final List<String> filteredGroups = new ArrayList<>();
18 allGroupDetails.entrySet().forEach(entry -> {
19 String groupId = entry.getKey();
20 ConsumerGroupDescription description = entry.getValue();
21 boolean topicSubscribed = description.members().stream().map(MemberDescription::assignment)
22 .map(MemberAssignment::topicPartitions)
23 .map(tps -> tps.stream().map(TopicPartition::topic).collect(Collectors.toSet()))
24 .anyMatch(tps -> tps.contains(topic));
25 if (topicSubscribed)
26 filteredGroups.add(groupId);
27 });
28 return filteredGroups;
29 }
30 }
我会假设你的集群中没有配置安全认证和授权机制或者发起此AdminClient的用户是合法用户且有CLUSTER以及GROUP的DESCRIBE权限。
另外值得注意的是,上面这个函数无法获取非运行中的consumer group,即虽然一个group订阅了某topic,但是若它所有的consumer成员都关闭的话这个函数是不会返回该group的。
以上是关于Kafka获取订阅某topic的所有consumer group客户端版的主要内容,如果未能解决你的问题,请参考以下文章
Kafka Java consumer动态修改topic订阅