RocketMq Topic创建和删除
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMq Topic创建和删除相关的知识,希望对你有一定的参考价值。
参考技术A
Topic创建的核心步骤如下
删除Topic的核心逻辑如下
🏆Alibaba中间件技术系列「RocketMQ技术专题」Broker服务端自动创建topic的原理分析和问题要点指南
前提背景
使用RocketMQ进行发消息时,一般我们是必须要指定topic,此外topic必须要提前建立,但是topic的创建(自动或者手动方式)的设置有一个开关autoCreateTopicEnable,此部分主要会在broker节点的配置文件的时候进行设置,运行环境中会使用默认设置autoCreateTopicEnable = true,但是这样就会导致topic的设置不容易规范管理,所以在生产环境中会在Broker设置参数autoCreateTopicEnable = false。那么如果此参数稍有偏差,或者没有提前手动创建topic,则会频繁出现No route info of this topic这个错误,那么接下来我们探索一下此问题的出现原因以及系统如何进行创建topic。
No route info of this topic
相信做过RocketMQ项目的小伙伴们,可能对No route info of this topic一点都不陌生,说明的含义起始就是无法解析或者路由这个topic,但是造成的原因有很多种。
没有配置NameServer服务
Broker启动时我们没有配置NameSrv地址,发送程序会报错:No route info of this topic。但当我们配上NameSrv地址后,再次启动,可以正常发送消息。
没有建立autoCreateTopicEnable=true且没有创建该topic
当autoCreateTopicEnable=false时,DefaultMQProducerImpl.sendDefaultImpl,当发消息的时候肯定先要获取关于topic的一些信息,比如有几个消息队列,是不时有序topic,有这个topic的Broker列表等,当获取不到正确的信息时,就会抛出异常
RocketMQ的客户端版本与服务端版本不一致
RocketMQ Java客户端调用No route info of this topic错误(原因版本不一致)。此时,即使启动broker的时候设置autoCreateTopicEnable=true也没有用,假如,使用的rocketmq的版本是4.9.0,java client端版本4.3.0
RocketMQ 4.3.0版本的自动创建(autoCreateTopicEnable),客户端传递使用的AUTO_CREATE_TOPIC_KEY_TOPIC是”AUTO_CREATE_TOPIC_KEY”,新版本的client,客户端传递的默认AUTO_CREATE_TOPIC_KEY_TOPIC是“TBW102”。
org.apache.rocketmq.client.producer.DefaultMQProducer#createTopicKey
org.apache.rocketmq.common.MixAll#AUTO_CREATE_TOPIC_KEY_TOPIC
public static final String AUTO_CREATE_TOPIC_KEY_TOPIC = "TBW102";
实际代码
> 4.4.0版本
<=4.3.0版本
方案1:要不就进行调整client客户端版本的version
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.5.1</version>
</dependency>
方案2:调整自动创建代码为AUTO_CREATE_TOPIC_KEY
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unidcque_group_name");
//设置自动创建topic的key值
producer.setCreateTopicKey("AUTO_CREATE_TOPIC_KEY");
Topic之前并未创建过,Broker未配置NameSrv地址,无法发送,而配置NameSrv后则可以正常发送。这中间有2个问题:
1、topic是怎么自动创建的?
2、topic自动创建过程中Broker、NameSrv如何协作配合的?
分析以下如何自动创建topic的源码流程
RocketMQ基本路由规则
-
Broker在启动时向Nameserver注册存储在该服务器上的路由信息,并每隔30s向Nameserver发送心跳包,并更新路由信息。
Nameserver每隔10s扫描路由表,如果检测到Broker服务宕机,则移除对应的路由信息。 -
消息生产者每隔30s会从Nameserver重新拉取Topic的路由信息并更新本地路由表;在消息发送之前,如果本地路由表中不存在对应主题的路由消息时,会主动向Nameserver拉取该主题的消息。
-
如果autoCreateTopicEnable设置为true,消息发送者向NameServer查询主题的路由消息返回空时,会尝试用一个系统默认的主题名称(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC),此时消息发送者得到的路由信息为:
默认Topic的路由信息是如何创建的?
Nameserver?broker?当autoCreateTopicEnable=false时,DefaultMQProducerImpl.sendDefaultImpl,当发消息的时候肯定先要获取关于topic的一些信息,比如有几个消息队列,是不时有序topic,有这个topic的Broker列表等,当获取不到正确的信息时,就会抛出异常
private SendResult sendDefaultImpl(
Message msg,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException
this.makeSureStateOK();
Validators.checkMessage(msg, this.defaultMQProducer);
final long invokeID = random.nextLong();
long beginTimestampFirst = System.currentTimeMillis();
long beginTimestampPrev = beginTimestampFirst;
long endTimestamp = beginTimestampFirst;
// 如果获取到topic的路由信息,则发送,否则抛异常
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok())
... ...
List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
if (null == nsList || nsList.isEmpty())
throw new MQClientException(
"No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);
throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
tryToFindTopicPublishInfo是发送的关键,如果获取到topic的信息,则发送,否则就异常;因此之前No route info of this topic的异常,就是Producer获取不到Topic的信息,导致发送失败。
先从topicPublishInfoTable缓存中获取
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic)
// topicPublishInfoTable是Producer本地缓存的topic信息表
// Producer启动后,会添加默认的topic:TBW102
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
if (null == topicPublishInfo || !topicPublishInfo.ok())
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
// 未获取到,从NameSrv获取该topic的信息
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
// 获取到了,则返回
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok())
return topicPublishInfo;
else
// 没获取到,再换种方式从NameSrv获取
// 如果再获取不到,那后续就无法发送了
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
-
Producer本地topicPublishInfoTable变量中没有topic的信息,只缓存了TBW102。
-
尝试从NameSrv获取Topic的信息。获取失败,NameSrv中根本没有Topic,因为这个topic是Producer发送时设置的,没有同步到NameSrv。
-
再换种方式从NameSrv获取,如果获取到了,那么可以执行发送流程,如果还是没有获取到,就会抛No route info of this topic的异常了。
再从NameServer服务中进行获取
public boolean updateTopicRouteInfoFromNameServer(final String topic)
return updateTopicRouteInfoFromNameServer(topic, false, null);
-
第1次获取时,isDefault传的false,defaultMQProducer传的null,因此在updateTopicRouteInfoFromNameServer会走else分支,用Topic去获取
-
第2次获取时,isDefault传的true,defaultMQProducer也传值了,因此会走if分支,将入参的topic转换为默认的TBW102,获取TBW102的信息
-
不管Broker配没配NameSrv地址,获取Topic的信息,必失败
-
获取TBW102信息:
- 2.1 Broker配置了NameSrv地址,成功
- 2.2 Broker没有配置NameSrv地址,失败
生产者首先向NameServer查询路由信息,由于是一个不存在的主题,故此时返回的路由信息为空,RocketMQ会使用默认的主题再次寻找,由于开启了自动创建路由信息,NameServer会向生产者返回默认主题的路由信息。
然后从返回的路由信息中选择一个队列(默认轮询)。消息发送者从Nameserver获取到默认的Topic的队列信息后,队列的个数会改变吗?
从NameServer中获取,注意这个isDefault=false,defaultMQProducer=null
温馨提示:消息发送者在到默认路由信息时,其队列数量,会选择DefaultMQProducer#defaultTopicQueueNums与Nameserver返回的的队列数取最小值,DefaultMQProducer#defaultTopicQueueNums默认值为4,故自动创建的主题,其队列数量默认为4。
获取消息对应的topic信息
发请求RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader)
,但是因为没有任何一个Broker有关于这个topic的信息,所以namesrv就会返回topic不存在,处理请求的代码在DefaultRequestProcessor的。
case RequestCode.GET_ROUTEINTO_BY_TOPIC: return this.getRouteInfoByTopic(ctx, request);
也就是回应码ResponseCode.TOPIC_NOT_EXIST,然后抛出异常 throw new MQClientException(response.getCode(), response.getRemark());被捕获之后退出返回false。
从NameServer获取相关的Topic信息数据
updateTopicRouteInfoFromNameServer最终会发给NameSrv一个GET_ROUTEINTO_BY_TOPIC请求
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
DefaultMQProducer defaultMQProducer)
try
if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS))
try
TopicRouteData topicRouteData;
if (isDefault && defaultMQProducer != null)
topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
1000 * 3);
if (topicRouteData != null)
for (QueueData data : topicRouteData.getQueueDatas())
int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
data.setReadQueueNums(queueNums);
data.setWriteQueueNums(queueNums);
else
topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
catch (Exception e)
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(MixAll.DEFAULT_TOPIC))
log.warn("updateTopicRouteInfoFromNameServer Exception", e);
finally
this.lockNamesrv.unlock();
else
log.warn("updateTopicRouteInfoFromNameServer tryLock timeout ms", LOCK_TIMEOUT_MILLIS);
catch (InterruptedException e)
log.warn("updateTopicRouteInfoFromNameServer Exception", e);
return false;
因为if条件不满足,所以获取默认的topic信息,注意isDefault=true,defaultMQProducer=defaultMQProducer
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok())
return topicPublishInfo;
else
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;
默认的topic为"TBW102",这个时候如果namesrv中如果还是没有这个topic的信息的话,就会抛出异常No route info of this topic。
autoCreateTopicEnable=true的作用。
Broker启动流程自动创建topic
-
在Broker启动流程中,会构建TopicConfigManager对象,其构造方法中首先会判断是否开启了允许自动创建主题,如果启用了自动创建主题,则向topicConfigTable中添加默认主题的路由信息。
-
当Broker启动时,TopicConfigManager初始化,这里会判断该标识,创建TBW102topic,并且在后续的心跳中把信息更新到namesrv中,这样在发消息的时候就不会抛出不存在的异常。
// MixAll.DEFAULT_TOPIC
if (this.brokerController.getBrokerConfig().isAutoCreateTopicEnable())
String topic = MixAll.DEFAULT_TOPIC;
TopicConfig topicConfig = new TopicConfig(topic);
this.systemTopicList.add(topic);
topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig()
.getDefaultTopicQueueNums());
topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig()
.getDefaultTopicQueueNums());
int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE;
topicConfig.setPerm(perm);
this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
该topicConfigTable中所有的路由信息,会随着Broker向Nameserver发送心跳包中,Nameserver收到这些信息后,更新对应Topic的路由信息表。
BrokerConfig的defaultTopicQueueNum默认为8。两台Broker服务器都会运行上面的过程,故最终Nameserver中关于默认主题的路由信息中,会包含两个Broker分别各8个队列信息。
TopicConfigManager构造方法
当从namesrv查出Topic相关的信息时,在topicRouteData2TopicPublishInfo设置消息队列数量 info.getMessageQueueList().add(mq);,调用updateTopicPublishInfo方法更新缓存topicPublishInfoTable
// Update Pub info
TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
publishInfo.setHaveTopicRouterInfo(true);
Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
while (it.hasNext())
Entry<String, MQProducerInner> entry = it.next();
MQProducerInner impl = entry.getValue();
if (impl != null)
impl.updateTopicPublishInfo(topic, publishInfo);
然后if (topicPublishInfo != null && topicPublishInfo.ok()) 这个条件就会符合,那个异常就不会抛出。
当autoCreateTopicEnable=false时
- 创建topic的类UpdateTopicSubCommand(),设置相应的信息,最后调用defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig);
- 发消息RequestCode.UPDATE_AND_CREATE_TOPIC,AdminBrokerProcessor处理消息 case RequestCode.UPDATE_AND_CREATE_TOPIC: return this.updateAndCreateTopic(ctx, request);
- 同步给其他Broker
this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
this.brokerController.registerBrokerAll(false, true);
Broker端收到消息后的处理流程
服务端收到消息发送的处理器为:SendMessageProcessor,在处理消息发送时,会调用super.msgCheck方法:
AbstractSendMessageProcessor#msgCheck
在Broker端,首先会使用TopicConfigManager根据topic查询路由信息,如果Broker端不存在该主题的路由配置(路由信息),此时如果Broker中存在默认主题的路由配置信息,则根据消息发送请求中的队列数量,在Broker创建新Topic的路由信息。这样Broker服务端就会存在主题的路由信息。
在Broker端的topic配置管理器中存在的路由信息,一会向Nameserver发送心跳包,汇报到Nameserver,另一方面会有一个定时任务,定时存储在broker端,具体路径为$ROCKET_HOME/store/config/topics.json中,这样在Broker关闭后再重启,并不会丢失路由信息。
TBW102是为何物?
TBW102是Broker启动时,当autoCreateTopicEnable的配置为true时,会自动创建该默认topic。
public TopicConfigManager(BrokerController brokerController)
this.brokerController = brokerController;
// ...
// MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC
if (this.brokerController.getBrokerConfig().isAutoCreateTopicEnable())
String topic = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC;
TopicConfig topicConfig = new TopicConfig(topic);
this.systemTopicList.add(topic);
topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig()
.getDefaultTopicQueueNums());
topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig()
.getDefaultTopicQueueNums());
int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE;
topicConfig.setPerm(perm);
this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
// ...
autoCreateTopicEnable的默认值是true,可以同步外部配置文件,让Broker启动时加载,来改变该值。我理解的TBW102的作用是当开启自动创建topic功能,发送时用了未配置的topic,可以让该topic继承默认TBW102的配置,实现消息的发送。
总结分析
-
client本地首先没有缓存对应topic的路由信息,然后先去nameserver去查找,nameserver中也没有此topic的路由信息,然后返回给client。client接收到返回后再向nameserver请求topic为tbw102的路由信息。
-
如果有broker设置了autocreateTopic,则broker在启动的时候会在topicManager中创建对应的topicconfig通过心跳发送给nameserver,namerserver会将其保存。nameserver将之前保存的tbw102的路由信息返回给请求的client。
-
client拿到了topic为tbw102的路由信息后返回,client根据返回的tbw102路由信息(里面包含所有设置了autocreateTopic为true的broker,默认每个broker会在client本地创建DefaultTopicQueueNums=4个读写队列选择,假设两个broker则会有8个队列让你选择)先缓存到本地的topicPublishInfoTable表中,key为此topic ,value为此topicRouteData,轮询选择一个队列进行发送。
根据选择到的队列对应的broker发送该topic消息。
broker在接收到此消息后会在msgcheck方法中调用createTopicInSendMessageMethod方法创建topicConfig信息塞进topicConfigTable表中,然后就跟发送已经创建的topic的流程一样发送消息了。
同时topicConfigTable会通过心跳将新的这个topicConfig信息发送给nameserver。
nameserver接收到后会更新topic的路由信息,如果之前接收到消息的broker没有全部覆盖到,因为broker会30S向nameserver发送一次心跳,心跳包里包含topicconfig,覆盖到的broker会将自动创建好的topicconfig信息发送给nameserver,从而在nameserver那边接收到后会注册这个新的topic信息,因为消费者每30S也会到nameserver去更新本地的topicrouteinfo,请求发送到nameserver得到了之前覆盖到的broker发送的心跳包更新后的最新topic路由信息,那么未被覆盖的broker就永远不会加入到这个负载均衡了,就会造成负载均衡达不到预期了,即所有能自动创建topic的broker不能全部都参与进来。
参考资料
https://www.cnblogs.com/dingwpmz/p/11809404.html
以上是关于RocketMq Topic创建和删除的主要内容,如果未能解决你的问题,请参考以下文章
精华推荐 | 深入浅出RocketMQ原理及实战「底层原理挖掘系列」透彻剖析贯穿RocketMQ的Broker服务端自动创建topic的原理分析和问题要点指南