kafka命令行的管理使用

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kafka命令行的管理使用相关的知识,希望对你有一定的参考价值。

参考技术A

首先要启动好kafka集群
1、集群时间同步
2、启动zookeeper集群
3、启动kafka集群
启动kafka集群的方式就是在集群中每台机器 kafka目录 下运行
nohup bin/kafka-server-start.sh config/server.properties >/dev/null 2>&1 &

kafka发出消息和接收消息都是基于topic,所以要先创建一个topic,才能向里面发消息。创建topic的脚本:

topic创建好了,就可以向里边发送消息了。

通过命令行实现数据的发送 producer 生产者
kafka-console-producer.sh 就是用来测试用的脚本,可以模拟kafka消息的发送端。
直接运行 kafka-console-producer.sh 查看帮助

--broker-list 指定我们kafka集群的地址
--topic 指定我们的消息发送到哪个topic里面去

通过命令行实现数据的接收 consumer 消费者

--bootstrap-server 表示我们的kafak集群的地址,在旧版本中使用的是--zookeeper参数,两者至少使用一个
--from-beginning 表示我们从最开始的数据进行消费
--topic指定我们topic的名字

在producer端发送数据,在consumer端可以收到数据

32 | KafkaAdminClient:Kafka的运维利器

文章目录


管理与监控

32 | KafkaAdminClient:Kafka的运维利器

引入原因

首先,不论是 Windows 平台,还是 Linux 平台,命令行的脚本都只能运行在控制台上。如果想要在应用程序、运维框架或是监控平台中集成它们,会非常得困难。

其次,这些命令行脚本很多都是通过连接 ZooKeeper 来提供服务的。目前,社区已经越来越不推荐任何工具直连 ZooKeeper 了,因为这会带来一些潜在的问题,比如这可能会绕过 Kafka 的安全设置。

最后,运行这些脚本需要使用 Kafka 内部的类实现,也就是 Kafka 服务器端的代码。实际上,社区还是希望用户只使用 Kafka 客户端代码,通过现有的请求机制来运维管理集群。这样的话,所有运维操作都能纳入到统一的处理机制下,方便后面的功能演进。

基于以上原因,社区于 0.11 版本正式推出了 Java 客户端版的 AdminClient,并不断地在后续的版本中对它进行完善。

如何使用?

如果使用的是 Maven,需要增加以下依赖项:

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>2.3.0</version>
</dependency>

如果使用的是 Gradle,需要增加以下依赖项:

compile group: 'org.apache.kafka', name: 'kafka-clients', version: '2.3.0'

功能

在 2.3 版本中,AdminClient 提供的功能有 9 大类。

  1. 主题管理:包括主题的创建、删除和查询。
  2. 权限管理:包括具体权限的配置与删除。
  3. 配置参数管理:包括 Kafka 各种资源的参数设置、详情查询。所谓的 Kafka 资源,主要有
    Broker、主题、用户、Client-id 等。
  4. 副本日志管理:包括副本底层日志路径的变更和详情查询。
  5. 分区管理:即创建额外的主题分区。
  6. 消息删除:即删除指定位移之前的分区消息。
  7. Delegation Token 管理:包括 Delegation Token 的创建、更新、过期和详情查询。
  8. 消费者组管理:包括消费者组的查询、位移查询和删除。
  9. Preferred 领导者选举:推选指定主题分区的 Preferred Broker 为领导者。

工作原理

从设计上来看,AdminClient 是一个双线程的设计:前端主线程和后端 I/O 线程。 前端线程负责将用户要执行的操作转换成对应的请求,然后再将请求发送到后端 I/O 线程的队列中;而后端 I/O 线程从队列中读取相应的请求,然后发送到对应的 Broker 节点上,之后把执行结果保存起来,以便等待前端线程的获取。

如图所示,前端主线程会创建名为 Call 的请求对象实例。该实例有两个主要的任务。

  1. 构建对应的请求对象。 比如,如果要创建主题,那么就创建 CreateTopicsRequest;如果是查询消费者组位移,就创建 OffsetFetchRequest。
  2. 指定响应的回调逻辑。 比如从 Broker 端接收到 CreateTopicsResponse 之后要执行的动作。一旦创建好 Call 实例,前端主线程会将其放入到新请求队列(New Call Queue)中,此时,前端主线程的任务就算完成了。它只需要等待结果返回即可。

后端 I/O 线程使用了 3 个队列来承载不同时期的请求对象,它们分别是新请求队列、待发送请求队列和处理中请求队列

为什么要使用 3 个呢? 因为目前新请求队列的线程安全是由 Java 的 monitor 锁来保证的。为了确保前端主线程不会因为 monitor 锁被阻塞,后端 I/O 线程会定期地将新请求队列中的所有 Call 实例全部搬移到待发送请求队列中进行处理。待发送请求队列和处理中请求队列只由后端 I/O 线程处理,因此无需任何锁机制来保证线程安全。

当 I/O 线程在处理某个请求时,它会显式地将该请求保存在处理中请求队列。一旦处理完成,I/O 线程会自动地调用 Call 对象中的回调逻辑完成最后的处理。把这些都做完之后,I/O 线程会通知前端主线程说结果已经准备完毕,这样前端主线程能够及时获取到执行操作的结果。AdminClient 是使用 Java Object 对象的 wait 和 notify 实现的这种通知机制。

严格来说,AdminClient 并没有使用 Java 已有的队列去实现上面的请求队列,它是使用 ArrayList 和 HashMap 这样的简单容器类,再配以 monitor 锁来保证线程安全的。

后端 I/O 线程的名字的前缀是 kafka-admin-client-thread如果发现 AdminClient 程序貌似在正常工作,但执行的操作没有返回结果,或者 hang 住了,这可能是因为 I/O 线程出现问题导致的,可以使用 jstack 命令去查看一下 AdminClient 程序,确认下 I/O 线程是否在正常工作。

构造和销毁 AdminClient 实例

AdminClient 对象的完整类路径是 org.apache.kafka.clients.admin.AdminClient,而不是 kafka.admin.AdminClient。后者就是服务器端的 AdminClient,它已经不被推荐使用了。

创建 AdminClient 实例和创建 KafkaProducer 或 KafkaConsumer 实例的方法是类似的,需要手动构造一个 Properties 对象或 Map 对象,然后传给对应的方法。社区专门为 AdminClient 提供了几十个专属参数,最常见而且必须要指定的参数,是 bootstrap.servers 参数。如果要销毁 AdminClient 实例,需要显式调用 AdminClient 的 close 方法。

Properties props = new Properties();
props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, "kafka-host:port");
props.put("request.timeout.ms", 600000);

try (AdminClient client = AdminClient.create(props)) 
         // 执行要做的操作……

常见的 AdminClient 应用实例

创建主题

String newTopicName = "test-topic";
try (AdminClient client = AdminClient.create(props)) 
         NewTopic newTopic = new NewTopic(newTopicName, 10, (short) 3);
         CreateTopicsResult result = client.createTopics(Arrays.asList(newTopic));
         result.all().get(10, TimeUnit.SECONDS);

这段代码调用 AdminClient 的 createTopics 方法创建对应的主题。构造主题的类是 NewTopic 类,它接收主题名称、分区数和副本数三个字段。

目前,AdminClient 各个方法的返回类型都是名为 ***Result 的对象。这类对象会将结果以 Java Future 的形式封装起来。如果要获取运行结果,需要调用相应的方法来获取对应的 Future 对象,然后再调用相应的 get 方法来取得执行结果。

查询消费者组位移

String groupID = "test-group";
try (AdminClient client = AdminClient.create(props)) 
         ListConsumerGroupOffsetsResult result = client.listConsumerGroupOffsets(groupID);
         Map<TopicPartition, OffsetAndMetadata> offsets = 
                  result.partitionsToOffsetAndMetadata().get(10, TimeUnit.SECONDS);
         System.out.println(offsets);

调用 AdminClient 的 listConsumerGroupOffsets 方法去获取指定消费者组的位移数据。

获取 Broker 磁盘占用

try (AdminClient client = AdminClient.create(props)) 
         DescribeLogDirsResult ret = client.describeLogDirs(Collections.singletonList(targetBrokerId)); // 指定Broker id
         long size = 0L;
         for (Map<String, DescribeLogDirsResponse.LogDirInfo> logDirInfoMap : ret.all().get().values()) 
                  size += logDirInfoMap.values().stream().map(logDirInfo -> logDirInfo.replicaInfos).flatMap(
                           topicPartitionReplicaInfoMap ->
                           topicPartitionReplicaInfoMap.values().stream().map(replicaInfo -> replicaInfo.size))
                           .mapToLong(Long::longValue).sum();
         
         System.out.println(size);

这段代码的主要思想是,使用 AdminClient 的 describeLogDirs 方法获取指定 Broker 上所有分区主题的日志路径信息,然后把它们累积在一起,得出总的磁盘占用量。

开发者涨薪指南 48位大咖的思考法则、工作方式、逻辑体系

以上是关于kafka命令行的管理使用的主要内容,如果未能解决你的问题,请参考以下文章

Kafka在命令行的启动流程

Apache Kafka:使用java和命令行方式操作topic(admin api)

Kafka的命令行工具

32 | KafkaAdminClient:Kafka的运维利器

idea 启动命令行的时候提示不能创建PTY

两本最近阅读的工具书的记录 关于Python和Linux命令行的 不喜勿喷 只是写给自己用