RocketMq Topic创建和删除

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMq Topic创建和删除相关的知识,希望对你有一定的参考价值。

参考技术A



Topic创建的核心步骤如下












删除Topic的核心逻辑如下








🏆Alibaba中间件技术系列「RocketMQ技术专题」Broker服务端自动创建topic的原理分析和问题要点指南

前提背景

使用RocketMQ进行发消息时,一般我们是必须要指定topic,此外topic必须要提前建立,但是topic的创建(自动或者手动方式)的设置有一个开关autoCreateTopicEnable,此部分主要会在broker节点的配置文件的时候进行设置,运行环境中会使用默认设置autoCreateTopicEnable = true,但是这样就会导致topic的设置不容易规范管理,所以在生产环境中会在Broker设置参数autoCreateTopicEnable = false。那么如果此参数稍有偏差,或者没有提前手动创建topic,则会频繁出现No route info of this topic这个错误,那么接下来我们探索一下此问题的出现原因以及系统如何进行创建topic。

No route info of this topic

相信做过RocketMQ项目的小伙伴们,可能对No route info of this topic一点都不陌生,说明的含义起始就是无法解析或者路由这个topic,但是造成的原因有很多种。

没有配置NameServer服务

Broker启动时我们没有配置NameSrv地址,发送程序会报错:No route info of this topic。但当我们配上NameSrv地址后,再次启动,可以正常发送消息。

没有建立autoCreateTopicEnable=true且没有创建该topic

当autoCreateTopicEnable=false时,DefaultMQProducerImpl.sendDefaultImpl,当发消息的时候肯定先要获取关于topic的一些信息,比如有几个消息队列,是不时有序topic,有这个topic的Broker列表等,当获取不到正确的信息时,就会抛出异常

RocketMQ的客户端版本与服务端版本不一致

RocketMQ Java客户端调用No route info of this topic错误(原因版本不一致)。此时,即使启动broker的时候设置autoCreateTopicEnable=true也没有用,假如,使用的rocketmq的版本是4.9.0,java client端版本4.3.0

RocketMQ 4.3.0版本的自动创建(autoCreateTopicEnable),客户端传递使用的AUTO_CREATE_TOPIC_KEY_TOPIC是”AUTO_CREATE_TOPIC_KEY”,新版本的client,客户端传递的默认AUTO_CREATE_TOPIC_KEY_TOPIC是“TBW102”。

org.apache.rocketmq.client.producer.DefaultMQProducer#createTopicKey
org.apache.rocketmq.common.MixAll#AUTO_CREATE_TOPIC_KEY_TOPIC
public static final String AUTO_CREATE_TOPIC_KEY_TOPIC = "TBW102";

实际代码

> 4.4.0版本

<=4.3.0版本

方案1:要不就进行调整client客户端版本的version

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.5.1</version>
</dependency>

方案2:调整自动创建代码为AUTO_CREATE_TOPIC_KEY

DefaultMQProducer producer = new DefaultMQProducer("please_rename_unidcque_group_name");
//设置自动创建topic的key值
producer.setCreateTopicKey("AUTO_CREATE_TOPIC_KEY");

Topic之前并未创建过,Broker未配置NameSrv地址,无法发送,而配置NameSrv后则可以正常发送。这中间有2个问题:
1、topic是怎么自动创建的?
2、topic自动创建过程中Broker、NameSrv如何协作配合的?

分析以下如何自动创建topic的源码流程

RocketMQ基本路由规则

  1. Broker在启动时向Nameserver注册存储在该服务器上的路由信息,并每隔30s向Nameserver发送心跳包,并更新路由信息。
    Nameserver每隔10s扫描路由表,如果检测到Broker服务宕机,则移除对应的路由信息。

  2. 消息生产者每隔30s会从Nameserver重新拉取Topic的路由信息并更新本地路由表;在消息发送之前,如果本地路由表中不存在对应主题的路由消息时,会主动向Nameserver拉取该主题的消息。

  3. 如果autoCreateTopicEnable设置为true,消息发送者向NameServer查询主题的路由消息返回空时,会尝试用一个系统默认的主题名称(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC),此时消息发送者得到的路由信息为:

默认Topic的路由信息是如何创建的?

Nameserver?broker?当autoCreateTopicEnable=false时,DefaultMQProducerImpl.sendDefaultImpl,当发消息的时候肯定先要获取关于topic的一些信息,比如有几个消息队列,是不时有序topic,有这个topic的Broker列表等,当获取不到正确的信息时,就会抛出异常

 private SendResult sendDefaultImpl(
        Message msg,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException 
        this.makeSureStateOK();
        Validators.checkMessage(msg, this.defaultMQProducer);
        final long invokeID = random.nextLong();
        long beginTimestampFirst = System.currentTimeMillis();
        long beginTimestampPrev = beginTimestampFirst;
        long endTimestamp = beginTimestampFirst;
        // 如果获取到topic的路由信息,则发送,否则抛异常
        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
        if (topicPublishInfo != null && topicPublishInfo.ok()) 
           ... ...
        
        List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
        if (null == nsList || nsList.isEmpty()) 
            throw new MQClientException(
                "No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);
        
        throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
            null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
    

tryToFindTopicPublishInfo是发送的关键,如果获取到topic的信息,则发送,否则就异常;因此之前No route info of this topic的异常,就是Producer获取不到Topic的信息,导致发送失败。

先从topicPublishInfoTable缓存中获取

private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) 
    // topicPublishInfoTable是Producer本地缓存的topic信息表
    // Producer启动后,会添加默认的topic:TBW102
    TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
    if (null == topicPublishInfo || !topicPublishInfo.ok()) 
        this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
        // 未获取到,从NameSrv获取该topic的信息
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
    
    // 获取到了,则返回
    if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) 
        return topicPublishInfo;
     else 
        // 没获取到,再换种方式从NameSrv获取
        // 如果再获取不到,那后续就无法发送了
        this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
        topicPublishInfo = this.topicPublishInfoTable.get(topic);
        return topicPublishInfo;
    

  1. Producer本地topicPublishInfoTable变量中没有topic的信息,只缓存了TBW102。

  2. 尝试从NameSrv获取Topic的信息。获取失败,NameSrv中根本没有Topic,因为这个topic是Producer发送时设置的,没有同步到NameSrv。

  3. 再换种方式从NameSrv获取,如果获取到了,那么可以执行发送流程,如果还是没有获取到,就会抛No route info of this topic的异常了。

再从NameServer服务中进行获取

public boolean updateTopicRouteInfoFromNameServer(final String topic) 
        return updateTopicRouteInfoFromNameServer(topic, false, null);

  1. 第1次获取时,isDefault传的false,defaultMQProducer传的null,因此在updateTopicRouteInfoFromNameServer会走else分支,用Topic去获取

  2. 第2次获取时,isDefault传的true,defaultMQProducer也传值了,因此会走if分支,将入参的topic转换为默认的TBW102,获取TBW102的信息

  3. 不管Broker配没配NameSrv地址,获取Topic的信息,必失败

  4. 获取TBW102信息:

    • 2.1 Broker配置了NameSrv地址,成功
    • 2.2 Broker没有配置NameSrv地址,失败

生产者首先向NameServer查询路由信息,由于是一个不存在的主题,故此时返回的路由信息为空,RocketMQ会使用默认的主题再次寻找,由于开启了自动创建路由信息,NameServer会向生产者返回默认主题的路由信息。

然后从返回的路由信息中选择一个队列(默认轮询)。消息发送者从Nameserver获取到默认的Topic的队列信息后,队列的个数会改变吗?

从NameServer中获取,注意这个isDefault=false,defaultMQProducer=null

温馨提示:消息发送者在到默认路由信息时,其队列数量,会选择DefaultMQProducer#defaultTopicQueueNums与Nameserver返回的的队列数取最小值,DefaultMQProducer#defaultTopicQueueNums默认值为4,故自动创建的主题,其队列数量默认为4。

获取消息对应的topic信息

发请求RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader),但是因为没有任何一个Broker有关于这个topic的信息,所以namesrv就会返回topic不存在,处理请求的代码在DefaultRequestProcessor的。

case RequestCode.GET_ROUTEINTO_BY_TOPIC:  return this.getRouteInfoByTopic(ctx, request);

也就是回应码ResponseCode.TOPIC_NOT_EXIST,然后抛出异常 throw new MQClientException(response.getCode(), response.getRemark());被捕获之后退出返回false。

从NameServer获取相关的Topic信息数据

updateTopicRouteInfoFromNameServer最终会发给NameSrv一个GET_ROUTEINTO_BY_TOPIC请求

public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
        DefaultMQProducer defaultMQProducer) 
        try 
            if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) 
                try 
                    TopicRouteData topicRouteData;
                    if (isDefault && defaultMQProducer != null) 
                        topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
                            1000 * 3);
                        if (topicRouteData != null) 
                            for (QueueData data : topicRouteData.getQueueDatas()) 
                                int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
                                data.setReadQueueNums(queueNums);
                                data.setWriteQueueNums(queueNums);
                            
                        
                     else 
                        topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
                    
                 catch (Exception e) 
                    if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(MixAll.DEFAULT_TOPIC)) 
                        log.warn("updateTopicRouteInfoFromNameServer Exception", e);
                    
                 finally 
                    this.lockNamesrv.unlock();
                
             else 
                log.warn("updateTopicRouteInfoFromNameServer tryLock timeout ms", LOCK_TIMEOUT_MILLIS);
            
         catch (InterruptedException e) 
            log.warn("updateTopicRouteInfoFromNameServer Exception", e);
        
        return false;
    

因为if条件不满足,所以获取默认的topic信息,注意isDefault=true,defaultMQProducer=defaultMQProducer

if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) 
            return topicPublishInfo;
         else 
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
            return topicPublishInfo;

默认的topic为"TBW102",这个时候如果namesrv中如果还是没有这个topic的信息的话,就会抛出异常No route info of this topic。
autoCreateTopicEnable=true的作用。

Broker启动流程自动创建topic

  • 在Broker启动流程中,会构建TopicConfigManager对象,其构造方法中首先会判断是否开启了允许自动创建主题,如果启用了自动创建主题,则向topicConfigTable中添加默认主题的路由信息。

  • 当Broker启动时,TopicConfigManager初始化,这里会判断该标识,创建TBW102topic,并且在后续的心跳中把信息更新到namesrv中,这样在发消息的时候就不会抛出不存在的异常。

 // MixAll.DEFAULT_TOPIC
            if (this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) 
                String topic = MixAll.DEFAULT_TOPIC;
                TopicConfig topicConfig = new TopicConfig(topic);
                this.systemTopicList.add(topic);
                topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig()
                    .getDefaultTopicQueueNums());
                topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig()
                    .getDefaultTopicQueueNums());
                int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE;
                topicConfig.setPerm(perm);
                this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
            

该topicConfigTable中所有的路由信息,会随着Broker向Nameserver发送心跳包中,Nameserver收到这些信息后,更新对应Topic的路由信息表。

BrokerConfig的defaultTopicQueueNum默认为8。两台Broker服务器都会运行上面的过程,故最终Nameserver中关于默认主题的路由信息中,会包含两个Broker分别各8个队列信息。

TopicConfigManager构造方法

当从namesrv查出Topic相关的信息时,在topicRouteData2TopicPublishInfo设置消息队列数量 info.getMessageQueueList().add(mq);,调用updateTopicPublishInfo方法更新缓存topicPublishInfoTable

 // Update Pub info
                            
                                TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
                                publishInfo.setHaveTopicRouterInfo(true);
                                Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
                                while (it.hasNext()) 
                                    Entry<String, MQProducerInner> entry = it.next();
                                    MQProducerInner impl = entry.getValue();
                                    if (impl != null) 
                                        impl.updateTopicPublishInfo(topic, publishInfo);
                                    
                                
                            

然后if (topicPublishInfo != null && topicPublishInfo.ok()) 这个条件就会符合,那个异常就不会抛出。

当autoCreateTopicEnable=false时

  1. 创建topic的类UpdateTopicSubCommand(),设置相应的信息,最后调用defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig);
  2. 发消息RequestCode.UPDATE_AND_CREATE_TOPIC,AdminBrokerProcessor处理消息 case RequestCode.UPDATE_AND_CREATE_TOPIC: return this.updateAndCreateTopic(ctx, request);
  3. 同步给其他Broker
this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
this.brokerController.registerBrokerAll(false, true);

Broker端收到消息后的处理流程

服务端收到消息发送的处理器为:SendMessageProcessor,在处理消息发送时,会调用super.msgCheck方法:

AbstractSendMessageProcessor#msgCheck

在Broker端,首先会使用TopicConfigManager根据topic查询路由信息,如果Broker端不存在该主题的路由配置(路由信息),此时如果Broker中存在默认主题的路由配置信息,则根据消息发送请求中的队列数量,在Broker创建新Topic的路由信息。这样Broker服务端就会存在主题的路由信息。

在Broker端的topic配置管理器中存在的路由信息,一会向Nameserver发送心跳包,汇报到Nameserver,另一方面会有一个定时任务,定时存储在broker端,具体路径为$ROCKET_HOME/store/config/topics.json中,这样在Broker关闭后再重启,并不会丢失路由信息。

TBW102是为何物?

TBW102是Broker启动时,当autoCreateTopicEnable的配置为true时,会自动创建该默认topic。

public TopicConfigManager(BrokerController brokerController) 
    this.brokerController = brokerController;
    // ...
    
        // MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC
        if (this.brokerController.getBrokerConfig().isAutoCreateTopicEnable()) 
            String topic = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC;
            TopicConfig topicConfig = new TopicConfig(topic);
            this.systemTopicList.add(topic);
            topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig()
                .getDefaultTopicQueueNums());
            topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig()
                .getDefaultTopicQueueNums());
            int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE;
            topicConfig.setPerm(perm);
            this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);
        
    
    // ...

autoCreateTopicEnable的默认值是true,可以同步外部配置文件,让Broker启动时加载,来改变该值。我理解的TBW102的作用是当开启自动创建topic功能,发送时用了未配置的topic,可以让该topic继承默认TBW102的配置,实现消息的发送。

总结分析

  1. client本地首先没有缓存对应topic的路由信息,然后先去nameserver去查找,nameserver中也没有此topic的路由信息,然后返回给client。client接收到返回后再向nameserver请求topic为tbw102的路由信息。

  2. 如果有broker设置了autocreateTopic,则broker在启动的时候会在topicManager中创建对应的topicconfig通过心跳发送给nameserver,namerserver会将其保存。nameserver将之前保存的tbw102的路由信息返回给请求的client。

  3. client拿到了topic为tbw102的路由信息后返回,client根据返回的tbw102路由信息(里面包含所有设置了autocreateTopic为true的broker,默认每个broker会在client本地创建DefaultTopicQueueNums=4个读写队列选择,假设两个broker则会有8个队列让你选择)先缓存到本地的topicPublishInfoTable表中,key为此topic ,value为此topicRouteData,轮询选择一个队列进行发送。

根据选择到的队列对应的broker发送该topic消息。

broker在接收到此消息后会在msgcheck方法中调用createTopicInSendMessageMethod方法创建topicConfig信息塞进topicConfigTable表中,然后就跟发送已经创建的topic的流程一样发送消息了。

同时topicConfigTable会通过心跳将新的这个topicConfig信息发送给nameserver。

nameserver接收到后会更新topic的路由信息,如果之前接收到消息的broker没有全部覆盖到,因为broker会30S向nameserver发送一次心跳,心跳包里包含topicconfig,覆盖到的broker会将自动创建好的topicconfig信息发送给nameserver,从而在nameserver那边接收到后会注册这个新的topic信息,因为消费者每30S也会到nameserver去更新本地的topicrouteinfo,请求发送到nameserver得到了之前覆盖到的broker发送的心跳包更新后的最新topic路由信息,那么未被覆盖的broker就永远不会加入到这个负载均衡了,就会造成负载均衡达不到预期了,即所有能自动创建topic的broker不能全部都参与进来。

参考资料

https://www.cnblogs.com/dingwpmz/p/11809404.html

https://www.pianshen.com/article/24191855587/

https://www.jianshu.com/p/c8fd57a7f741

极限就是为了超越而存在的

以上是关于RocketMq Topic创建和删除的主要内容,如果未能解决你的问题,请参考以下文章

springboot消费者偶发连接rocket失败

消息队列RocketMQ如何存储消息

Rocket详细教程

rocketMq-Topic创建过程

精华推荐 | 深入浅出RocketMQ原理及实战「底层原理挖掘系列」透彻剖析贯穿RocketMQ的Broker服务端自动创建topic的原理分析和问题要点指南

深度解析RocketMQ Topic的创建机制