Apache RocketMQ:使用api实现对Topic的基本操作

Posted 你是小KS

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Apache RocketMQ:使用api实现对Topic的基本操作相关的知识,希望对你有一定的参考价值。

当前rocketmq版本4.9

1. 声明

当前内容主要为本人学习Rocketmq之用,当前内容部分参考官方文档

pom依赖和前面的一样:之前的pom依赖

2. 重要的概念

就是Name Server就是一个名称服务器群组,所有的broker都注册到名称服务器中,名称服务器控制灾难恢复(保存元数据信息)

就是领导者,主要控制集群控制,实际数据的读写等操作

  1. name server 默认启动在9876
  2. broker server 默认启动在10911

所以如果在Rocketmq中只要出现与外界交互的参数为addr的都是broker服务器,声明了namesrvaddr的才是名称服务器,这个对在使用api操作的时候非常有用

3. Topic的基本操作Demo

本次采用的rocketmq的地址,使用单个进行测试

ip端口用处
192.168.1.1029876name server
192.168.1.10210911broker server
private static String namesrvAddr = "192.168.1.102:9876";
	private static String brokerAddr = "192.168.1.102:10911";
	private static String defaultTopicName = "localhost";

	public static void main(String[] args) throws MQClientException {
		DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("testGroup");
		consumer.setNamesrvAddr(namesrvAddr);
		ClientConfig clientConfig = consumer.cloneClientConfig();
		String buildMQClientId = consumer.buildMQClientId();
		MQClientInstance mqClientInstance = new MQClientInstance(clientConfig, 0, buildMQClientId);
		// 这个必须要开启
		mqClientInstance.start();
		MQClientAPIImpl mqClientAPIImpl = mqClientInstance.getMQClientAPIImpl();
		MQAdminImpl mqAdminImpl = mqClientInstance.getMQAdminImpl();
		// 获得当前的topic集合列表
		printTopicList(mqClientAPIImpl);
		
		// 删除主题
		try {
			deleteTopicByName(mqClientAPIImpl,"hello");
		} catch (Exception e1) {
			// TODO Auto-generated catch block
			e1.printStackTrace();
		}
		printTopicList(mqClientAPIImpl);
		// 创建主题(和更新是在一起的)
		  try { createTopic(mqAdminImpl,mqClientAPIImpl, "hello"); } catch (Exception e) {
		  e.printStackTrace(); }
		 

		printTopicList(mqClientAPIImpl);

		// getmQClientFactory.shutdown();
		try {
			printAllTopicInfo(mqClientAPIImpl);
		} catch (Exception e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		mqClientInstance.shutdown();
		consumer.shutdown();
	}

	public static Set<String> getTopics(MQClientAPIImpl mqClientAPIImpl) {
		Set<String> topics = new HashSet<>();
		long timeoutMillis = 2000L;
		try {
			TopicList topicList = mqClientAPIImpl.getTopicListFromNameServer(timeoutMillis);
			// String brokerAddr = topicList.getBrokerAddr();
			Set<String> topicSet = topicList.getTopicList();
			topics.addAll(topicSet);
			// System.out.println("成功获取到地址:"+brokerAddr+"的所有的topic");
			// System.out.println(topicSet);
			TopicList systemTopicList = mqClientAPIImpl.getSystemTopicList(timeoutMillis);
			Set<String> sysTopicSet = systemTopicList.getTopicList();
			// System.out.println("输出当前系统的topic");
			// System.out.println(sysTopicSet);
			topics.addAll(sysTopicSet);
			return topics;
		} catch (RemotingException | MQClientException | InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
			throw new RuntimeException(e);
		}
	}

	// 输出当前客户端获取到的topic
	public static void printTopicList(MQClientAPIImpl mqClientAPIImpl) {
		Set<String> topics = getTopics(mqClientAPIImpl);
		System.out.println("打印所有的主题");
		System.out.println(topics);
	}
	
	public static void printAllTopicInfo(MQClientAPIImpl mqClientAPIImpl) throws Exception {
		TopicConfigSerializeWrapper allTopicConfig = mqClientAPIImpl.getAllTopicConfig(brokerAddr, 6000L);
		ConcurrentMap<String,TopicConfig> topicConfigTable = allTopicConfig.getTopicConfigTable();
		System.out.println(topicConfigTable);
	}
	

	// 直接创建一个topic(注意创建topic需要使用MQAdminImpl创建,并需要一个默认的topic才行,但是mqClientAPIImpl却执行失败)
	public static void createTopic(MQAdminImpl mqAdminImpl,MQClientAPIImpl mqClientAPIImpl, String topicName) throws Exception {
		Set<String> topics = getTopics(mqClientAPIImpl);
		if (!topics.contains(topicName)) {
			// 这个可以
			//mqAdminImpl.createTopic(defaultTopicName,topicName, 16);
			// 注意使用brokerAddr
			mqClientAPIImpl.createTopic(brokerAddr, defaultTopicName, new TopicConfig(topicName), 2000L);
		} else {
			System.out.println("已存在topic名称为" + topicName + "的主题,无法执行添加操作!");
		}
	}

	// 手动删除topic操作,可以执行删除的操作
	public static void deleteTopicByName(MQClientAPIImpl mqClientAPIImpl, String topicName) throws Exception {
		Set<String> topics = getTopics(mqClientAPIImpl);
		if (topics.contains(topicName)) {
			// 执行删除
			mqClientAPIImpl.deleteTopicInBroker(brokerAddr, topicName, 2000L);
			//mqClientAPIImpl.deleteTopicInNameServer(namesrvAddr, topicName, 2000L);
		} else {
			System.out.println("没有这个topic名称为" + topicName + "的主题,无法执行删除操作!");
		}
	}

4. 测试

操作成功

RocketMQLog:WARN No appenders could be found for logger (io.netty.util.internal.PlatformDependent0).
RocketMQLog:WARN Please initialize the logger system properly.
打印所有的主题
[localhost, RMQ_SYS_TRANS_HALF_TOPIC, BenchmarkTest, OFFSET_MOVED_EVENT, TRANS_CHECK_MAX_TIME_TOPIC, TBW102, SELF_TEST_TOPIC, DefaultCluster, SCHEDULE_TOPIC_XXXX, RMQ_SYS_TRACE_TOPIC, DefaultCluster_REPLY_TOPIC, %RETRY%testGroup, RMQ_SYS_TRANS_OP_HALF_TOPIC, TopicTest, hello]
打印所有的主题
[localhost, RMQ_SYS_TRANS_HALF_TOPIC, BenchmarkTest, OFFSET_MOVED_EVENT, TRANS_CHECK_MAX_TIME_TOPIC, TBW102, SELF_TEST_TOPIC, DefaultCluster, SCHEDULE_TOPIC_XXXX, RMQ_SYS_TRACE_TOPIC, DefaultCluster_REPLY_TOPIC, %RETRY%testGroup, RMQ_SYS_TRANS_OP_HALF_TOPIC, TopicTest, hello]
已存在topic名称为hello的主题,无法执行添加操作!
打印所有的主题
[localhost, RMQ_SYS_TRANS_HALF_TOPIC, BenchmarkTest, OFFSET_MOVED_EVENT, TRANS_CHECK_MAX_TIME_TOPIC, TBW102, SELF_TEST_TOPIC, DefaultCluster, SCHEDULE_TOPIC_XXXX, RMQ_SYS_TRACE_TOPIC, DefaultCluster_REPLY_TOPIC, %RETRY%testGroup, RMQ_SYS_TRANS_OP_HALF_TOPIC, TopicTest, hello]
{localhost=TopicConfig [topicName=localhost, readQueueNums=1, writeQueueNums=1, perm=RWX, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false], SCHEDULE_TOPIC_XXXX=TopicConfig [topicName=SCHEDULE_TOPIC_XXXX, readQueueNums=18, writeQueueNums=18, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false], RMQ_SYS_TRANS_HALF_TOPIC=TopicConfig [topicName=RMQ_SYS_TRANS_HALF_TOPIC, readQueueNums=1, writeQueueNums=1, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false], DefaultCluster_REPLY_TOPIC=TopicConfig [topicName=DefaultCluster_REPLY_TOPIC, readQueueNums=1, writeQueueNums=1, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false], %RETRY%testGroup=TopicConfig [topicName=%RETRY%testGroup, readQueueNums=1, writeQueueNums=1, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false], BenchmarkTest=TopicConfig [topicName=BenchmarkTest, readQueueNums=1024, writeQueueNums=1024, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false], OFFSET_MOVED_EVENT=TopicConfig [topicName=OFFSET_MOVED_EVENT, readQueueNums=1, writeQueueNums=1, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false], TopicTest=TopicConfig [topicName=TopicTest, readQueueNums=4, writeQueueNums=4, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false], TBW102=TopicConfig [topicName=TBW102, readQueueNums=8, writeQueueNums=8, perm=RWX, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false], SELF_TEST_TOPIC=TopicConfig [topicName=SELF_TEST_TOPIC, readQueueNums=1, writeQueueNums=1, perm=RW-, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false], DefaultCluster=TopicConfig [topicName=DefaultCluster, readQueueNums=16, writeQueueNums=16, perm=RWX, topicFilterType=SINGLE_TAG, topicSysFlag=0, order=false]}

其中出现的问题:org.apache.rocketmq.remoting.exception.RemotingTimeoutException: wait response on the channel <192.168.1.102:9876> timeout, 6000(ms),都是由于无法连接导致的,出现的问题就是必须切换为broker的地址进访问

以上是关于Apache RocketMQ:使用api实现对Topic的基本操作的主要内容,如果未能解决你的问题,请参考以下文章

RocketMQ简单实现

阿里RocketMQ是怎样孵化成Apache顶级项目的?

精华推荐 | 深入浅出RocketMQ原理及实战「底层原理挖掘系列」透彻剖析贯穿RocketMQ的存储系统的实现原理和持久化机制

Apache RocketMQ 正式开源分布式事务消息

始于阿里,回归社区 | Apache RocketMQ 开发者沙龙

RocketMQ(08)——日志输出到RocketMQ