集群管理工具KafkaAdminClient——改造

Posted 朱小厮的博客

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了集群管理工具KafkaAdminClient——改造相关的知识,希望对你有一定的参考价值。




1
前文概述

在上一篇文章《》中讲述了KafkaAdminClient的功能以及相应的原理,但是同时也提出了目前的KafkaAdminClient并没有非常的完善,还有许多功能还需要去丰富,这些功能可以自定义实现,在《》一文中介绍了如何获取Kafka的消费详情,其原理是通过Java调用Kafka的Scala代码实现的,如果要使用纯Java的方式实现就需要用到了KafkaAdminClient,另外Scala版的AdminClient也被标注为:“This client is deprecated, and will be replaced by KafkaAdminClient.”,说明官方也推荐使用KafkaAdminClient。不过现在的版本(目前最新1.1.0)并没有提供类似describeConsumerGroup和listGroupOffsets的方法实现,这一点在前文《》也有提及,所以如果要实现获取类似消费者详情的功能,那么就需要自己动手进行改造。

参考Scala版的AdminClient,要实现获取Kafka的消费者详情的功能首先需要实现describeConsumerGroup和listGroupOffsets的方法,其中describeConsumerGroup方法内部还需要一个findCoordinator的方法用来提供消费者对应的coodinator节点,以便提供详细的消费者详情。describeConsumerGroup、listGroupOffsets和findCoordinator这三个方法都将在KafkaAdminClient类里提供自定义实现。KafkaAdminClient和XXXOptions、XXXResult的类都位于org.apache.kafka.clients.admin包下,笔者也将扩展的类也置于其同一包下,不过也进行了一些区分,如下图所示,新加入的XXXOptions、XXXResult类放入extend下,新加入的JavaBean放入model下,然后与具体应用功能对应的放在app下:

集群管理工具KafkaAdminClient——改造

首先建立对应的XXXOptions、XXXResult类,就那简单的ListGroupOffsets来说,其ListGroupOffsetsOptions只是继承了AbstractOptions的空实现,而ListGroupOffsetsResult也很简单,提供了一个KafkaFuture的调用,代码参考如下:

public class ListGroupOffsetsResult {
   private final KafkaFutureImpl<Map<TopicPartition, Long>> future;
   public ListGroupOffsetsResult(KafkaFutureImpl<Map<TopicPartition, Long>> future) {
       this.future = future;
   }
   public KafkaFutureImpl<Map<TopicPartition, Long>> values(){
       return this.future;
   }
}

model目录下的ConsumerGroupSummary是所要实现的describeConsumerGroup方法中所要获取的值类型,封装在DescribeConsumerGroupResult 中;ConsumerSummary在describeConsumerGroup方法内部使用,用来封装消费状态,包括consumerId、clientId、host(消费者主机)以及TopicPartition列表,最终被封装进ConsumerGroupSummary中。PartitionAssignmentState是服务于KafkaConsumerGroupService的,用来最后显示消费者详情列表。

KafkaAdminClient的父类是AdminClient(kafka-client中的抽象类),describeConsumerGroup、listGroupOffsets和findCoordinator这三个方法也需要在AdminClient类中做申明,详细参考如下:

public abstract DescribeConsumerGroupResult describeConsumerGroup(final String group,
                                                        final DescribeConsumerGroupOptions options
)
;
public DescribeConsumerGroupResult describeConsumerGroup(final String group) {
   return describeConsumerGroup(group, new DescribeConsumerGroupOptions());
}
public abstract FindCoordinatorResult findCoordinator(final String group,
                                                     final FindCoordinatorOptions options
)
;
public FindCoordinatorResult findCoordinator(final String group) {
   return findCoordinator(group, new FindCoordinatorOptions());
}
public abstract ListGroupOffsetsResult listGroupOffsets(final String group,
                                                       final ListGroupOffsetsOptions options
)
;
public ListGroupOffsetsResult listGroupOffsets(final String group){
   return listGroupOffsets(group, new ListGroupOffsetsOptions());
}

在前面2篇文章《》和《》中都详细解释了describeConsumerGroup、listGroupOffsets方法,所以这里不在赘述,具体实现也很简单,可以参考笔者的实现:https://github.com/hiddenzzh/kafka/blob/master/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java

本篇以及《》、《》这三篇文章都是围绕如何获取消费者详情来做具体的陈述,回到问题的初衷:kafka.admin.ConsumerGroupCommand.PartitionAssignmentState无法被外部访问,那么真的需要这么复杂的转变过程么,详细请参考下下篇《Scala与Java语言的互操作》。

END

Kafka解析之topic创建(3)——合法性验证

《》

《》

《》

《》

     

以上是关于集群管理工具KafkaAdminClient——改造的主要内容,如果未能解决你的问题,请参考以下文章

集群管理工具KafkaAdminClient——原理与示例

32 | KafkaAdminClient:Kafka的运维利器

kafka AdminClient 获取kafka版本

node 集群与稳定

使用 kubectl 管理 kubeconfig 配置文件

nacos2.0集群,关于改了端口还是报端口占用的问题