Apache Kafka:使用java和命令行方式操作topic(admin api)
Posted 你是小KS
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Apache Kafka:使用java和命令行方式操作topic(admin api)相关的知识,希望对你有一定的参考价值。
当前版本:kafka_2.12-2.8.0
1. 声明
当前内容主要为本人学习kafka的topic的增删改查操作,当前内容参考官方文档
- 其中
--bootstrap-server
表示当前的kafka的地址和端口 --topic
指定的就是主题名称--create,--delete,--alter,--list/--describe
表示各种操作
2. 使用命令行的操作
首先启动zookeeper,然后启动当前的kafka(当前kafka启动在192.168.1.105:9092
)
1.创建topic
./bin/kafka-topics.sh --create --topic test-events --bootstrap-server 192.168.1.105:9092
2.查询topic
./bin/kafka-topics.sh --describe --bootstrap-server 192.168.1.105:9092
./bin/kafka-topics.sh --list --bootstrap-server 192.168.1.105:9092
第一个是查询所有的topic并显示详细信息,第二个是只显示topic的名称
3.删除topic
./bin/kafka-topics.sh --bootstrap-server 192.168.1.105:9092 --delete --topic test-events
4.修改topic(只能修改config信息)
./bin/kafka-topics.sh --bootstrap-server 192.168.1.105:9092 --alter --topic test-events --partitions 40
3. 使用java方式的操作
导入pom依赖
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka_2.12</artifactId>
<version>2.8.0</version>
</dependency>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>2.8.0</version>
</dependency>
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import org.apache.kafka.clients.admin.AdminClient;
import org.apache.kafka.clients.admin.CreateTopicsResult;
import org.apache.kafka.clients.admin.DeleteTopicsResult;
import org.apache.kafka.clients.admin.KafkaAdminClient;
import org.apache.kafka.clients.admin.ListTopicsResult;
import org.apache.kafka.clients.admin.NewTopic;
import org.apache.kafka.common.KafkaFuture;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
/**
*
* @author hy
* @createTime 2021-05-30 13:54:03
* @description 当前内容主要为测试和使用Kafka的admin api
* 主要为当前的topic的基本操作
*
*/
public class AdminApiTest {
public static void main(String[] args) throws InterruptedException, ExecutionException, TimeoutException {
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.1.105:9092");
AdminClient client = KafkaAdminClient.create(props);
//addTopic(client, "test-create1");
listTopics(client);
deleteTopic(client, "test-create1");
listTopics(client);
// create.close();
client.close(2l, TimeUnit.SECONDS);
}
/**
*
* @author hy
* @createTime 2021-08-08 15:02:22
* @description 列出当前kafka的所有的主题
* @param client
*
*/
private static void listTopics(AdminClient client) {
ListTopicsResult listTopics = client.listTopics();
KafkaFuture<Set<String>> names = listTopics.names();
Set<String> topicSet = new HashSet<String>();
try {
topicSet = names.get();
} catch (InterruptedException | ExecutionException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println(topicSet);
}
/**
*
* @author hy
* @createTime 2021-08-08 16:10:09
* @description 添加主题操作
* @param client
* @param topicName
* @throws InterruptedException
* @throws ExecutionException
*
*/
private static void addTopic(AdminClient client, String topicName) throws InterruptedException, ExecutionException {
// 创建一个topic并设置分区为1,设置复制数量为1
// 注意Number of partitions must be larger than 0.
// Replication factor must be larger than 0.
NewTopic newTopic = new NewTopic(topicName, 1, (short) 1);
CreateTopicsResult createTopicsResult = client.createTopics(Arrays.asList(newTopic));
Map<String, KafkaFuture<Void>> values = createTopicsResult.values();
values.forEach((key, value) -> {
System.out.println("key=" + key + ",value=" + value.isDone());
});
}
/**
*
* @author hy
* @createTime 2021-08-08 16:11:20
* @description 执行当前删除主题的操作
* @param client
* @param topicName
*
*/
private static void deleteTopic(AdminClient client, String topicName) {
DeleteTopicsResult deleteTopicsResult = client.deleteTopics(Arrays.asList(topicName));
Map<String, KafkaFuture<Void>> values = deleteTopicsResult.values();
System.out.println(values);
}
}
测试成功
以上是关于Apache Kafka:使用java和命令行方式操作topic(admin api)的主要内容,如果未能解决你的问题,请参考以下文章
Apache Kafka:简单的命令行操作topic实现消息发送和接收