rocketMq架构原理精华分析

Posted nandao158

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了rocketMq架构原理精华分析相关的知识,希望对你有一定的参考价值。

rocketMq架构原理精华分析是我们这篇文章的核心,从消息中间件的对比、架构模型、消息模型、常见问题等逐一分析:

一、中间件对比:

RabbitMq 集群效果不太好,底层不是java 语言,研究原理比较困难;

Kafka是针对日志收集场景设计的,他的并发性能并不是很理想。尤其当他的Topic过多时,由于Partition文件也会过多,会严重影响IO性能;

RocketMQ的消息吞吐量虽然依然不如Kafka,但是却比RabbitMQ高很多。在阿里内部,RocketMQ集群每天处理的请求数超过5万亿次,支持的核心应用超过3000个。

RocketMQ是阿里巴巴开源的一个消息中间件,在阿里内部历经了双十一等很多高并发场景的考验,能够处理亿万级别的消息。2016年开源后捐赠给Apache,现在是Apache的一个顶级项目。
目前RocketMQ在阿里云上有一个购买即可用的商业版本,商业版本集成了阿里内部一些更深层次的功能及运维定制。我们这里学习的是Apache的开源版本。开源版本相对于阿里云上的商业版本,功能上略有缺失,但是大体上功能是一样的。

二、RocketMq 基本工作原理

RocketMQ由以下这几个组件组成:
NameServer : 提供轻量级的Broker路由服务。
Broker:实际处理消息存储、转发等服务的核心组件。
Producer:消息生产者集群。通常是业务系统中的一个功能模块。
Consumer:消息消费者集群。通常也是业务系统中的一个功能模块。
所以我们要启动RocketMQ服务,需要先启动NameServer,然后启动Broker。 

三、消息转发模型
1 消息模型(Message Model)
RocketMQ主要由 Producer、Broker、Consumer 三部分组成,其中Producer负责生产消息,Consumer 负责消费消息,Broker 负责存储消息。Broker 在实际部署过程中对应一台服务器,每个 Broker 可以存储多个Topic的消息,每个Topic的消息也可以分片存储于不同的 BrokerMessage Queue 用于存储消息的物理地址,每个Topic中的消息地址存储于多个 Message Queue 中。ConsumerGroup由多个Consumer 实例构成。


2 消息生产者(Producer)
负责生产消息,一般由业务系统负责生产消息。一个消息生产者会把业务应用系统里产生的消息发送到broker服务器。RocketMQ提供多种发送方式,同步发送、异步发送、顺序发送、单向发送。同步和异步方式均需要Broker返回确认信息,单向发送不需要。生产者中,会把同一类Producer组成一个集合,叫做生产者组。同一组的Producer被认为是发送同一类消息且发送逻辑一致。


3 消息消费者(Consumer)
负责消费消息,一般是后台系统负责异步消费。一个消息消费者会从Broker服务器拉取消息、并将其提供给应用程序。从用户应用的角度而言提供了两种消费形式:拉取式消费、推动式消费。拉取式消费的应用通常主动调用Consumer的拉消息方法从Broker服务器拉消息、主动权由应用控制。一旦获取了批量消息,应用就会启动消费过程。推动式消费模式下Broker收到数据后会主动推送给消费端,该消费模式一般实时性较高。消费者同样会把同一类Consumer组成一个集合,叫做消费者组,这类Consumer通常消费同一类消息且消费逻辑一致。消费者组使得在消息消费方面,实现负载均衡和容错的目标变得非常容易。要注意的是,消费者组的消费者实例必须订阅完全相同的Topic。RocketMQ 支持两种消息模式:集群消费(Clustering)和广播消费(Broadcasting)。集群消费模式下, 相同Consumer Group的每个Consumer实例平均分摊消息。广播消费模式下,相同Consumer Group的每个Consumer实例都接收全量的消息。

4 主题(Topic)
表示一类消息的集合,每个主题包含若干条消息,每条消息只能属于一个主题,是RocketMQ进行消息订阅的基本单位。Topic只是一个逻辑概念,并不实际保存消息。同一个Topic下的消息,会分片保存到不同的Broker上,而每一个分片单位,就叫做MessageQueue。MessageQueue是一个具有FIFO特性的队列结构,生产者发送消息与消费者消费消息的最小单位。

5 代理服务器(Broker Server)
消息中转角色,负责存储消息、转发消息。代理服务器在RocketMQ系统中负责接收从生产者发送来的消息并存储、同时为消费者的拉取请求作准备。代理服务器也存储消息相关的元数据,包括消费者组、消费进度偏移和主题和队列消息等。
Broker Server是RocketMQ真正的业务核心,包含了多个重要的子模块:
Remoting Module:整个Broker的实体,负责处理来自clients端的请求。
Client Manager:负责管理客户端(Producer/Consumer)和维护Consumer的Topic订阅信息
Store Service:提供方便简单的API接口处理消息存储到物理硬盘和查询功能。
HA Service:高可用服务,提供Master Broker 和 Slave Broker之间的数据同步功能。
Index Service:根据特定的Message key对投递到Broker的消息进行索引服务,以提供消息的快速查询。而Broker Server要保证高可用需要搭建主从集群架构。RocketMQ中有两种Broker架构模式:


普通集群:
这种集群模式下会给每个节点分配一个固定的角色,master负责响应客户端的请求,并存储消息。slave则只负责对master的消息进行同步保存,并响应部分客户端的读请求。消息同步方式分为同步同步和异步同步。这种集群模式下各个节点的角色无法进行切换,也就是说,master节点挂了,这一组Broker就不可用了。


Dledger高可用集群:
Dledger是RocketMQ自4.5版本引入的实现高可用集群的一项技术。这个模式下的集群会随机选出一个节点作为master,而当master节点挂了后,会从slave中自动选出一个节点升级成为master。
Dledger技术做的事情:1、从集群中选举出master节点 2、完成master节点往slave节点的消息同步。

6 名字服务(Name Server)
名称服务充当路由消息的提供者。Broker Server会在启动时向所有的NameServer注册自己的服务信息,并且后续通过心跳请求的方式保证这个服务信息的实时性。生产者或消费者能够通过名字服务查找各主题相应的Broker IP列表。多个Namesrv实例组成集群,但相互独立,没有信息交换。这种特性也就意味着NameServer中任意的节点挂了,只要有一台服务节点正常,整个路由服务就不会有影响。当然,这里不考虑节点的负载情况。


7 消息(Message)
消息系统所传输信息的物理载体,生产和消费数据的最小单位,每条消息必须属于一个主题Topic。RocketMQ中每个消息拥有唯一的Message ID,且可以携带具有业务标识的Key。系统提供了通过Message ID和Key查询消息的功能。并且Message上有一个为消息设置的标志,Tag标签。用于同一主题下区分不同类型的消息。来自同一业务单元的消息,可以根据不同业务目的在同一主题下设置不同标签。标签能够有效地保持代码的清晰度和连贯性,并优化RocketMQ提供的查询系统。消费者可以根据Tag实现对不同子主题的不同消费逻辑,实现更好的扩展性。

整体的基础概念如下图总结:

到此、rocketMq基本原理分享完毕,下篇我们分享使用中遇到的问题,敬请期待!

 

精华推荐 | 深入浅出RocketMQ原理及实战「底层原理挖掘系列」透彻剖析贯穿RocketMQ的Broker服务端自动创建topic的原理分析和问题要点指南

前提背景

使用RocketMQ进行发消息时,一般我们是必须要指定topic,此外topic必须要提前建立,但是topic的创建(自动或者手动方式)的设置有一个开关autoCreateTopicEnable,此部分主要会在broker节点的配置文件的时候进行设置,运行环境中会使用默认设置autoCreateTopicEnable = true,但是这样就会导致topic的设置不容易规范管理,所以在生产环境中会在Broker设置参数autoCreateTopicEnable = false。那么如果此参数稍有偏差,或者没有提前手动创建topic,则会频繁出现No route info of this topic这个错误,那么接下来我们探索一下此问题的出现原因以及系统如何进行创建topic。

No route info of this topic

相信做过RocketMQ项目的小伙伴们,可能对No route info of this topic一点都不陌生,说明的含义起始就是无法解析或者路由这个topic,但是造成的原因有很多种。

没有配置NameServer服务

Broker启动时我们没有配置NameSrv地址,发送程序会报错:No route info of this topic。但当我们配上NameSrv地址后,再次启动,可以正常发送消息。

没有建立autoCreateTopicEnable=true且没有创建该topic

当autoCreateTopicEnable=false时,DefaultMQProducerImpl.sendDefaultImpl,当发消息的时候肯定先要获取关于topic的一些信息,比如有几个消息队列,是不时有序topic,有这个topic的Broker列表等,当获取不到正确的信息时,就会抛出异常

RocketMQ的客户端版本与服务端版本不一致

RocketMQ Java客户端调用No route info of this topic错误(原因版本不一致)。此时,即使启动broker的时候设置autoCreateTopicEnable=true也没有用,假如,使用的rocketmq的版本是4.9.0,java client端版本4.3.0

精华推荐

RocketMQ 4.3.0版本的自动创建(autoCreateTopicEnable),客户端传递使用的AUTO_CREATE_TOPIC_KEY_TOPIC是”AUTO_CREATE_TOPIC_KEY”,新版本的client,客户端传递的默认AUTO_CREATE_TOPIC_KEY_TOPIC是“TBW102”。

org.apache.rocketmq.client.producer.DefaultMQProducer#createTopicKey
org.apache.rocketmq.common.MixAll#AUTO_CREATE_TOPIC_KEY_TOPIC
public static final String AUTO_CREATE_TOPIC_KEY_TOPIC = "TBW102";
实际代码
> 4.4.0版本

精华推荐

<=4.3.0版本

精华推荐

方案1:要不就进行调整client客户端版本的version
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.5.1</version>
</dependency>
方案2:调整自动创建代码为AUTO_CREATE_TOPIC_KEY
DefaultMQProducer producer = new DefaultMQProducer("please_rename_unidcque_group_name");
//设置自动创建topic的key值
producer.setCreateTopicKey("AUTO_CREATE_TOPIC_KEY");

Topic之前并未创建过,Broker未配置NameSrv地址,无法发送,而配置NameSrv后则可以正常发送。这中间有2个问题: 1、topic是怎么自动创建的? 2、topic自动创建过程中Broker、NameSrv如何协作配合的?

分析以下如何自动创建topic的源码流程

RocketMQ基本路由规则

精华推荐

  1. Broker在启动时向Nameserver注册存储在该服务器上的路由信息,并每隔30s向Nameserver发送心跳包,并更新路由信息。 Nameserver每隔10s扫描路由表,如果检测到Broker服务宕机,则移除对应的路由信息。
  2. 消息生产者每隔30s会从Nameserver重新拉取Topic的路由信息并更新本地路由表;在消息发送之前,如果本地路由表中不存在对应主题的路由消息时,会主动向Nameserver拉取该主题的消息。
  3. 如果autoCreateTopicEnable设置为true,消息发送者向NameServer查询主题的路由消息返回空时,会尝试用一个系统默认的主题名称(MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC),此时消息发送者得到的路由信息为:
默认Topic的路由信息是如何创建的?

Nameserver?broker?当autoCreateTopicEnable=false时,DefaultMQProducerImpl.sendDefaultImpl,当发消息的时候肯定先要获取关于topic的一些信息,比如有几个消息队列,是不时有序topic,有这个topic的Broker列表等,当获取不到正确的信息时,就会抛出异常

private SendResult sendDefaultImpl(
Message msg,
final CommunicationMode communicationMode,
final SendCallback sendCallback,
final long timeout
) throws MQClientException, RemotingException, MQBrokerException, InterruptedException
this.makeSureStateOK();
Validators.checkMessage(msg, this.defaultMQProducer);
final long invokeID = random.nextLong();
long beginTimestampFirst = System.currentTimeMillis();
long beginTimestampPrev = beginTimestampFirst;
long endTimestamp = beginTimestampFirst;
// 如果获取到topic的路由信息,则发送,否则抛异常
TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
if (topicPublishInfo != null && topicPublishInfo.ok())
... ...

List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
if (null == nsList || nsList.isEmpty())
throw new MQClientException(
"No name server address, please set it." + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null).setResponseCode(ClientErrorCode.NO_NAME_SERVER_EXCEPTION);

throw new MQClientException("No route info of this topic, " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);

tryToFindTopicPublishInfo是发送的关键,如果获取到topic的信息,则发送,否则就异常;因此之前No route info of this topic的异常,就是Producer获取不到Topic的信息,导致发送失败。

先从topicPublishInfoTable缓存中获取
private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) 
// topicPublishInfoTable是Producer本地缓存的topic信息表
// Producer启动后,会添加默认的topic:TBW102
TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
if (null == topicPublishInfo || !topicPublishInfo.ok())
this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
// 未获取到,从NameSrv获取该topic的信息
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
topicPublishInfo = this.topicPublishInfoTable.get(topic);

// 获取到了,则返回
if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok())
return topicPublishInfo;
else
// 没获取到,再换种方式从NameSrv获取
// 如果再获取不到,那后续就无法发送了
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;

  1. Producer本地topicPublishInfoTable变量中没有topic的信息,只缓存了TBW102。
  2. 尝试从NameSrv获取Topic的信息。获取失败,NameSrv中根本没有Topic,因为这个topic是Producer发送时设置的,没有同步到NameSrv。
  3. 再换种方式从NameSrv获取,如果获取到了,那么可以执行发送流程,如果还是没有获取到,就会抛No route info of this topic的异常了。
再从NameServer服务中进行获取
public boolean updateTopicRouteInfoFromNameServer(final String topic) 
return updateTopicRouteInfoFromNameServer(topic, false, null);
  1. 第1次获取时,isDefault传的false,defaultMQProducer传的null,因此在updateTopicRouteInfoFromNameServer会走else分支,用Topic去获取
  2. 第2次获取时,isDefault传的true,defaultMQProducer也传值了,因此会走if分支,将入参的topic转换为默认的TBW102,获取TBW102的信息
  3. 不管Broker配没配NameSrv地址,获取Topic的信息,必失败
  4. 获取TBW102信息:
  • 2.1 Broker配置了NameSrv地址,成功
  • 2.2 Broker没有配置NameSrv地址,失败

生产者首先向NameServer查询路由信息,由于是一个不存在的主题,故此时返回的路由信息为空,RocketMQ会使用默认的主题再次寻找,由于开启了自动创建路由信息,NameServer会向生产者返回默认主题的路由信息。

然后从返回的路由信息中选择一个队列(默认轮询)。消息发送者从Nameserver获取到默认的Topic的队列信息后,队列的个数会改变吗?

从NameServer中获取,注意这个isDefault=false,defaultMQProducer=null

温馨提示:消息发送者在到默认路由信息时,其队列数量,会选择DefaultMQProducer#defaultTopicQueueNums与Nameserver返回的的队列数取最小值,DefaultMQProducer#defaultTopicQueueNums默认值为4,故自动创建的主题,其队列数量默认为4。

获取消息对应的topic信息

发请求​​RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINTO_BY_TOPIC, requestHeader)​​,但是因为没有任何一个Broker有关于这个topic的信息,所以namesrv就会返回topic不存在,处理请求的代码在DefaultRequestProcessor的。

case RequestCode.GET_ROUTEINTO_BY_TOPIC:  return this.getRouteInfoByTopic(ctx, request);

也就是回应码ResponseCode.TOPIC_NOT_EXIST,然后抛出异常 throw new MQClientException(response.getCode(), response.getRemark());被捕获之后退出返回false。

从NameServer获取相关的Topic信息数据

updateTopicRouteInfoFromNameServer最终会发给NameSrv一个GET_ROUTEINTO_BY_TOPIC请求

public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
DefaultMQProducer defaultMQProducer)
try
if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS))
try
TopicRouteData topicRouteData;
if (isDefault && defaultMQProducer != null)
topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
1000 * 3);
if (topicRouteData != null)
for (QueueData data : topicRouteData.getQueueDatas())
int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
data.setReadQueueNums(queueNums);
data.setWriteQueueNums(queueNums);


else
topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);

catch (Exception e)
if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(MixAll.DEFAULT_TOPIC))
log.warn("updateTopicRouteInfoFromNameServer Exception", e);

finally
this.lockNamesrv.unlock();

else
log.warn("updateTopicRouteInfoFromNameServer tryLock timeout ms", LOCK_TIMEOUT_MILLIS);

catch (InterruptedException e)
log.warn("updateTopicRouteInfoFromNameServer Exception", e);

return false;

因为if条件不满足,所以获取默认的topic信息,注意isDefault=true,defaultMQProducer=defaultMQProducer

if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) 
return topicPublishInfo;
else
this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
topicPublishInfo = this.topicPublishInfoTable.get(topic);
return topicPublishInfo;

默认的topic为"TBW102",这个时候如果namesrv中如果还是没有这个topic的信息的话,就会抛出异常No route info of this topic。 autoCreateTopicEnable=true的作用。

Broker启动流程自动创建topic
  • 在Broker启动流程中,会构建TopicConfigManager对象,其构造方法中首先会判断是否开启了允许自动创建主题,如果启用了自动创建主题,则向topicConfigTable中添加默认主题的路由信息。
  • 当Broker启动时,TopicConfigManager初始化,这里会判断该标识,创建TBW102topic,并且在后续的心跳中把信息更新到namesrv中,这样在发消息的时候就不会抛出不存在的异常。
// MixAll.DEFAULT_TOPIC
if (this.brokerController.getBrokerConfig().isAutoCreateTopicEnable())
String topic = MixAll.DEFAULT_TOPIC;
TopicConfig topicConfig = new TopicConfig(topic);
this.systemTopicList.add(topic);
topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig()
.getDefaultTopicQueueNums());
topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig()
.getDefaultTopicQueueNums());
int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE;
topicConfig.setPerm(perm);
this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);

该topicConfigTable中所有的路由信息,会随着Broker向Nameserver发送心跳包中,Nameserver收到这些信息后,更新对应Topic的路由信息表。

BrokerConfig的defaultTopicQueueNum默认为8。两台Broker服务器都会运行上面的过程,故最终Nameserver中关于默认主题的路由信息中,会包含两个Broker分别各8个队列信息。

TopicConfigManager构造方法

当从namesrv查出Topic相关的信息时,在topicRouteData2TopicPublishInfo设置消息队列数量 info.getMessageQueueList().add(mq);,调用updateTopicPublishInfo方法更新缓存topicPublishInfoTable

// Update Pub info

TopicPublishInfo publishInfo = topicRouteData2TopicPublishInfo(topic, topicRouteData);
publishInfo.setHaveTopicRouterInfo(true);
Iterator<Entry<String, MQProducerInner>> it = this.producerTable.entrySet().iterator();
while (it.hasNext())
Entry<String, MQProducerInner> entry = it.next();
MQProducerInner impl = entry.getValue();
if (impl != null)
impl.updateTopicPublishInfo(topic, publishInfo);


然后if (topicPublishInfo != null && topicPublishInfo.ok()) 这个条件就会符合,那个异常就不会抛出。

当autoCreateTopicEnable=false时
  1. 创建topic的类UpdateTopicSubCommand(),设置相应的信息,最后调用defaultMQAdminExt.createAndUpdateTopicConfig(addr, topicConfig);
  2. 发消息RequestCode.UPDATE_AND_CREATE_TOPIC,AdminBrokerProcessor处理消息 case RequestCode.UPDATE_AND_CREATE_TOPIC: return this.updateAndCreateTopic(ctx, request);
  3. 同步给其他Broker
this.brokerController.getTopicConfigManager().updateTopicConfig(topicConfig);
this.brokerController.registerBrokerAll(false, true);
Broker端收到消息后的处理流程

服务端收到消息发送的处理器为:SendMessageProcessor,在处理消息发送时,会调用super.msgCheck方法:

AbstractSendMessageProcessor#msgCheck

在Broker端,首先会使用TopicConfigManager根据topic查询路由信息,如果Broker端不存在该主题的路由配置(路由信息),此时如果Broker中存在默认主题的路由配置信息,则根据消息发送请求中的队列数量,在Broker创建新Topic的路由信息。这样Broker服务端就会存在主题的路由信息。

在Broker端的topic配置管理器中存在的路由信息,一会向Nameserver发送心跳包,汇报到Nameserver,另一方面会有一个定时任务,定时存储在broker端,具体路径为$ROCKET_HOME/store/config/topics.json中,这样在Broker关闭后再重启,并不会丢失路由信息。

TBW102是为何物?

TBW102是Broker启动时,当autoCreateTopicEnable的配置为true时,会自动创建该默认topic。

public TopicConfigManager(BrokerController brokerController) 
this.brokerController = brokerController;
// ...

// MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC
if (this.brokerController.getBrokerConfig().isAutoCreateTopicEnable())
String topic = MixAll.AUTO_CREATE_TOPIC_KEY_TOPIC;
TopicConfig topicConfig = new TopicConfig(topic);
this.systemTopicList.add(topic);
topicConfig.setReadQueueNums(this.brokerController.getBrokerConfig()
.getDefaultTopicQueueNums());
topicConfig.setWriteQueueNums(this.brokerController.getBrokerConfig()
.getDefaultTopicQueueNums());
int perm = PermName.PERM_INHERIT | PermName.PERM_READ | PermName.PERM_WRITE;
topicConfig.setPerm(perm);
this.topicConfigTable.put(topicConfig.getTopicName(), topicConfig);


// ...

autoCreateTopicEnable的默认值是true,可以同步外部配置文件,让Broker启动时加载,来改变该值。我理解的TBW102的作用是当开启自动创建topic功能,发送时用了未配置的topic,可以让该topic继承默认TBW102的配置,实现消息的发送。

总结分析

  1. client本地首先没有缓存对应topic的路由信息,然后先去nameserver去查找,nameserver中也没有此topic的路由信息,然后返回给client。client接收到返回后再向nameserver请求topic为tbw102的路由信息。
  2. 如果有broker设置了autocreateTopic,则broker在启动的时候会在topicManager中创建对应的topicconfig通过心跳发送给nameserver,namerserver会将其保存。nameserver将之前保存的tbw102的路由信息返回给请求的client。
  3. client拿到了topic为tbw102的路由信息后返回,client根据返回的tbw102路由信息(里面包含所有设置了autocreateTopic为true的broker,默认每个broker会在client本地创建DefaultTopicQueueNums=4个读写队列选择,假设两个broker则会有8个队列让你选择)先缓存到本地的topicPublishInfoTable表中,key为此topic ,value为此topicRouteData,轮询选择一个队列进行发送。

精华推荐

根据选择到的队列对应的broker发送该topic消息。

broker在接收到此消息后会在msgcheck方法中调用createTopicInSendMessageMethod方法创建topicConfig信息塞进topicConfigTable表中,然后就跟发送已经创建的topic的流程一样发送消息了。

同时topicConfigTable会通过心跳将新的这个topicConfig信息发送给nameserver。

nameserver接收到后会更新topic的路由信息,如果之前接收到消息的broker没有全部覆盖到,因为broker会30S向nameserver发送一次心跳,心跳包里包含topicconfig,覆盖到的broker会将自动创建好的topicconfig信息发送给nameserver,从而在nameserver那边接收到后会注册这个新的topic信息,因为消费者每30S也会到nameserver去更新本地的topicrouteinfo,请求发送到nameserver得到了之前覆盖到的broker发送的心跳包更新后的最新topic路由信息,那么未被覆盖的broker就永远不会加入到这个负载均衡了,就会造成负载均衡达不到预期了,即所有能自动创建topic的broker不能全部都参与进来。

以上是关于rocketMq架构原理精华分析的主要内容,如果未能解决你的问题,请参考以下文章

精华推荐 | 深入浅出RocketMQ原理及实战「底层原理挖掘系列」透彻剖析贯穿RocketMQ的Broker服务端自动创建topic的原理分析和问题要点指南

精华推荐 | 深入浅出RocketMQ原理及实战「性能原理挖掘系列」透彻剖析贯穿RocketMQ的事务性消息的底层原理并在分析其实际开发场景

精华推荐 | 深入浅出RocketMQ原理及实战「底层原理挖掘系列」透彻剖析贯穿RocketMQ的Broker服务端自动创建topic的原理分析和问题要点指南

精华收藏ClickHouse 系统架构存储引擎 查询引擎原理分析

精华推荐 | 深入浅出RocketMQ原理及实战「底层原理挖掘系列」透彻剖析贯穿RocketMQ的存储系统的实现原理和持久化机制

精华推荐 | 深入浅出RocketMQ原理及实战「底层原理挖掘系列」透彻剖析贯穿RocketMQ的存储系统的实现原理和持久化机制