Apache RocketMQ:使用api实现对Topic的基本操作
Posted 你是小KS
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Apache RocketMQ:使用api实现对Topic的基本操作相关的知识,希望对你有一定的参考价值。
当前rocketmq版本4.9
1. 声明
当前内容主要为本人学习Rocketmq之用,当前内容部分参考官方文档
pom依赖和前面的一样:之前的pom依赖
2. 重要的概念
就是Name Server就是一个名称服务器群组,所有的broker都注册到名称服务器中,名称服务器控制灾难恢复(保存元数据信息)
就是领导者,主要控制集群控制,实际数据的读写等操作
- name server 默认启动在
9876
- broker server 默认启动在
10911
所以如果在Rocketmq中只要出现与外界交互的参数为addr的都是broker服务器,声明了namesrvaddr的才是名称服务器
,这个对在使用api操作的时候非常有用
3. Topic的基本操作Demo
本次采用的rocketmq的地址,使用单个进行测试
ip | 端口 | 用处 |
---|---|---|
192.168.1.102 | 9876 | name server |
192.168.1.102 | 10911 | broker server |
private static String namesrvAddr = "192.168.1.102:9876";
private static String brokerAddr = "192.168.1.102:10911";
private static String defaultTopicName = "localhost";
public static void main(String[] args) throws MQClientException {
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testGroup");
consumer.setNamesrvAddr(namesrvAddr);
ClientConfig clientConfig = consumer.cloneClientConfig();
String buildMQClientId = consumer.buildMQClientId();
MQClientInstance mqClientInstance = new MQClientInstance(clientConfig, 0, buildMQClientId);
// 这个必须要开启
mqClientInstance.start();
MQClientAPIImpl mqClientAPIImpl = mqClientInstance.getMQClientAPIImpl();
MQAdminImpl mqAdminImpl = mqClientInstance.getMQAdminImpl();
// 获得当前的topic集合列表
printTopicList(mqClientAPIImpl);
// 删除主题
try {
deleteTopicByName(mqClientAPIImpl,"hello");
} catch (Exception e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
printTopicList(mqClientAPIImpl);
// 创建主题(和更新是在一起的)
try { createTopic(mqAdminImpl,mqClientAPIImpl, "hello"); } catch (Exception e) {
e.printStackTrace(); }
printTopicList(mqClientAPIImpl);
// getmQClientFactory.shutdown();
try {
printAllTopicInfo(mqClientAPIImpl);
} catch (Exception e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
mqClientInstance.shutdown();
consumer.shutdown();
}
public static Set<String> getTopics(MQClientAPIImpl mqClientAPIImpl) {
Set<String> topics = new HashSet<>();
long timeoutMillis = 2000L;
try {
TopicList topicList = mqClientAPIImpl.getTopicListFromNameServer(timeoutMillis);
// String brokerAddr = topicList.getBrokerAddr();
Set<String> topicSet = topicList.getTopicList();
topics.addAll(topicSet);
// System.out.println("成功获取到地址:"+brokerAddr+"的所有的topic");
// System.out.println(topicSet);
TopicList systemTopicList = mqClientAPIImpl.getSystemTopicList(timeoutMillis);
Set<String> sysTopicSet = systemTopicList.getTopicList();
// System.out.println("输出当前系统的topic");
// System.out.println(sysTopicSet);
topics.addAll(sysTopicSet);
return topics;
} catch (RemotingException | MQClientException | InterruptedException e) {
// TODO Auto-generated catch block
e.printStackTrace();
throw new RuntimeException(e);
}
}
// 输出当前客户端获取到的topic
public static void printTopicList(MQClientAPIImpl mqClientAPIImpl) {
Set<String> topics = getTopics(mqClientAPIImpl);
System.out.println("打印所有的主题");
System.out.println(topics);
}
public static void printAllTopicInfo(MQClientAPIImpl mqClientAPIImpl) throws Exception {
TopicConfigSerializeWrapper allTopicConfig = mqClientAPIImpl.getAllTopicConfig(brokerAddr, 6000L);
ConcurrentMap<String,TopicConfig> topicConfigTable = allTopicConfig.getTopicConfigTable();
System.out.println(topicConfigTable);
}
// 直接创建一个topic(注意创建topic需要使用MQAdminImpl创建,并需要一个默认的topic才行,但是mqClientAPIImpl却执行失败)
public static void createTopic(MQAdminImpl mqAdminImpl,MQClientAPIImpl mqClientAPIImpl, String topicName) throws Exception {
Set<String> topics = getTopics(mqClientAPIImpl);
if (!topics.contains(topicName)) {
// 这个可以
//mqAdminImpl.createTopic(defaultTopicName,topicName, 16);
// 注意使用brokerAddr
mqClientAPIImpl.createTopic(brokerAddr, defaultTopicName, new TopicConfig(topicName), 2000L);
} else {
System.out.println("已存在topic名称为" + topicName + "的主题,无法执行添加操作!");
}
}
// 手动删除topic操作,可以执行删除的操作
public static void deleteTopicByName(MQClientAPIImpl mqClientAPIImpl, String topicName) throws Exception {
Set<String> topics = getTopics(mqClientAPIImpl);
if (topics.contains(topicName)) {
// 执行删除
mqClientAPIImpl.deleteTopicInBroker(brokerAddr, topicName, 2000L);
//mqClientAPIImpl.deleteTopicInNameServer(namesrvAddr, topicName, 2000L);
} else {
System.out.println("没有这个topic名称为" + topicName + "的主题,无法执行删除操作!");
}
}
4. 测试
操作成功
RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
打印所有的主题
[localhost, RMQ_SYS_TRANS_HALF_TOPIC, BenchmarkTest, OFFSET_MOVED_EVENT, TRANS_CHECK_MAX_TIME_TOPIC, TBW102, SELF_TEST_TOPIC, DefaultCluster, SCHEDULE_TOPIC_XXXX, RMQ_SYS_TRACE_TOPIC, DefaultCluster_REPLY_TOPIC, %RETRY%testGroup, RMQ_SYS_TRANS_OP_HALF_TOPIC, TopicTest, hello]
打印所有的主题
[localhost, RMQ_SYS_TRANS_HALF_TOPIC, BenchmarkTest, OFFSET_MOVED_EVENT, TRANS_CHECK_MAX_TIME_TOPIC, TBW102, SELF_TEST_TOPIC, DefaultCluster, SCHEDULE_TOPIC_XXXX, RMQ_SYS_TRACE_TOPIC, DefaultCluster_REPLY_TOPIC, %RETRY%testGroup, RMQ_SYS_TRANS_OP_HALF_TOPIC, TopicTest, hello]
已存在topic名称为hello的主题,无法执行添加操作!
打印所有的主题
[localhost, RMQ_SYS_TRANS_HALF_TOPIC, BenchmarkTest, OFFSET_MOVED_EVENT, TRANS_CHECK_MAX_TIME_TOPIC, TBW102, SELF_TEST_TOPIC, DefaultCluster, SCHEDULE_TOPIC_XXXX, RMQ_SYS_TRACE_TOPIC, DefaultCluster_REPLY_TOPIC, %RETRY%testGroup, RMQ_SYS_TRANS_OP_HALF_TOPIC, TopicTest, hello]
{localhost=TopicConfig [topicName=localhost, readQueueNums=1, writeQueueNums=1, perm=RWX, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false], SCHEDULE_TOPIC_XXXX=TopicConfig [topicName=SCHEDULE_TOPIC_XXXX, readQueueNums=18, writeQueueNums=18, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false], RMQ_SYS_TRANS_HALF_TOPIC=TopicConfig [topicName=RMQ_SYS_TRANS_HALF_TOPIC, readQueueNums=1, writeQueueNums=1, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false], DefaultCluster_REPLY_TOPIC=TopicConfig [topicName=DefaultCluster_REPLY_TOPIC, readQueueNums=1, writeQueueNums=1, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false], %RETRY%testGroup=TopicConfig [topicName=%RETRY%testGroup, readQueueNums=1, writeQueueNums=1, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false], BenchmarkTest=TopicConfig [topicName=BenchmarkTest, readQueueNums=1024, writeQueueNums=1024, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false], OFFSET_MOVED_EVENT=TopicConfig [topicName=OFFSET_MOVED_EVENT, readQueueNums=1, writeQueueNums=1, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false], TopicTest=TopicConfig [topicName=TopicTest, readQueueNums=4, writeQueueNums=4, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false], TBW102=TopicConfig [topicName=TBW102, readQueueNums=8, writeQueueNums=8, perm=RWX, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false], SELF_TEST_TOPIC=TopicConfig [topicName=SELF_TEST_TOPIC, readQueueNums=1, writeQueueNums=1, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false], DefaultCluster=TopicConfig [topicName=DefaultCluster, readQueueNums=16, writeQueueNums=16, perm=RWX, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false]}
其中出现的问题:org.apache.rocketmq.remoting.exception.RemotingTimeoutException: wait response on the channel <192.168.1.102:9876> timeout, 6000(ms)
,都是由于无法连接导致的,出现的问题就是必须切换为broker的地址进访问
以上是关于Apache RocketMQ:使用api实现对Topic的基本操作的主要内容,如果未能解决你的问题,请参考以下文章
精华推荐 | 深入浅出RocketMQ原理及实战「底层原理挖掘系列」透彻剖析贯穿RocketMQ的存储系统的实现原理和持久化机制