RocketMQ源码解析-Broker与Namesrv以及Consumer交互
Posted _微风轻起
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ源码解析-Broker与Namesrv以及Consumer交互相关的知识,希望对你有一定的参考价值。
这一篇我们主要来分析下Broker里面的部分逻辑–Broker主要负责消息的存储、投递和查询以及服务高可用保证。下面我们就来大体梳理下其关于消息的投递、存储、拉取相关的一些逻辑。
我们知道broker实例时需要注册到namesrv
中,然后其他的生产者、消费者再从namesrv
中获取到对应的Broker
实例信息,例如broker保存了哪些topic
、topic
对应的队列信息、此borker
对应的ip信息等。
一、Broker的启动初始化
下面我们来看下broker
启动后的初始化加载:
在这里初始化的时候,主要有两个信息的初始化加载,也就是topic
信息、队列消费的offset
信息,主要是加载持久化的文件信息。
1、topic信息文件
1)、topic的加载
this.topicConfigManager.load();
这里主要就是加载store\\config
下面的topics.json
文件信息
public boolean load()
String fileName = null;
try
fileName = this.configFilePath();
String jsonString = MixAll.file2String(fileName);
if (null == jsonString || jsonString.length() == 0)
return this.loadBak();
else
this.decode(jsonString);
log.info("load " + fileName + " OK");
return true;
catch (Exception e)
log.error("load " + fileName + " failed, and try to load backup file", e);
return this.loadBak();
如果找不到,就会再去加载部分文件。
public void decode(String jsonString)
if (jsonString != null)
TopicConfigSerializeWrapper topicConfigSerializeWrapper =
TopicConfigSerializeWrapper.fromJson(jsonString, TopicConfigSerializeWrapper.class);
if (topicConfigSerializeWrapper != null)
this.topicConfigTable.putAll(topicConfigSerializeWrapper.getTopicConfigTable());
this.dataVersion.assignNewOne(topicConfigSerializeWrapper.getDataVersion());
this.printLoadDataWhenFirstBoot(topicConfigSerializeWrapper);
加载后就会将json转换为TopicConfigSerializeWrapper
,再其放到topicConfigTable
中。
public class TopicConfigSerializeWrapper extends RemotingSerializable
private ConcurrentMap<String, TopicConfig> topicConfigTable =
new ConcurrentHashMap<String, TopicConfig>();
private DataVersion dataVersion = new DataVersion();
"dataVersion":
"counter":5,
"timestamp":1645867253838
,
"topicConfigTable":
"cluster_queue_topic2":
"order":false,
"perm":6,
"readQueueNums":4,
"topicFilterType":"SINGLE_TAG",
"topicName":"cluster_queue_topic2",
"topicSysFlag":0,
"writeQueueNums":4
这上面就是topics.json
文件中的信息(已删除其他的topic
信息),可以看到其保存的当前topic
的权限、读取的队列数量等。
2)、topic的添加
上面是加载文件,当我们知道这个文件在创建topic
或者删除的时候是需要更新的,例如创建topic的时候。
public TopicConfig createTopicInSendMessageMethod(final String topic, final String defaultTopic,
final String remoteAddress, final int clientDefaultTopicQueueNums, final int topicSysFlag)
TopicConfig topicConfig = null;
boolean createNew = false;
try
if (this.lockTopicConfigTable.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS))
try
........
if (topicConfig != null)
log.info("Create new topic by default topic:[] config:[] producer:[]",
defaultTopic, topicConfig, remoteAddress);
this.topicConfigTable.put(topic, topicConfig);
this.dataVersion.nextVersion();
createNew = true;
this.persist();
finally
this.lockTopicConfigTable.unlock();
catch (InterruptedException e)
log.error("createTopicInSendMessageMethod exception", e);
if (createNew)
this.brokerController.registerBrokerAll(false, true,true);
return topicConfig;
例如新建topic
后会将其添加到topicConfigTable
中,然后再将其this.persist()
添加到文件中,这个同样是ConfigManager
的公共方法。同时如果这个topic
是新建的,会再通过registerBrokerAll
方法将其再更新到namesrv
中,这个方法也是我们等下要梳理的。
2、消费的队列信息文件
消费队列是加载的store\\config
下面的consumerOffset.json
文件信息
加载逻辑与后面的topic
加载类似,都是同一个父类ConfigManager
。
"offsetTable":
"%RETRY%cluster_consumer_queue_group@cluster_consumer_queue_group":0:0
,
"cluster_queue_topic2@cluster_consumer_queue_group":0:102,1:102,2:85,3:84
,
"%RETRY%simple_batch_consumer_group@simple_batch_consumer_group":0:0
这个文件主要是记录topic
对应的queue
队列的offset
消费位置。例如例如上面的
cluster_queue_topic2@cluster_consumer_queue_group":0:102,1:102,2:85,3:84
就是记录了消费组cluster_consumer_queue_group
对应的topic
-cluster_queue_topic2
下面的4个队列消费的offset
位置。
二、Broker的注册
1、broker的逻辑
broker
启动后,会将其的信息注册到namesrv
中。其是在broker
的启动方法的调用类BrokerController
中
public void start() throws Exception
if (this.messageStore != null)
this.messageStore.start();
if (this.remotingServer != null)
this.remotingServer.start();
.......
if (this.brokerOuterAPI != null)
this.brokerOuterAPI.start();
.......
this.registerBrokerAll(true, false, true);
........
public List<RegisterBrokerResult> registerBrokerAll(
final String clusterName,final String brokerAddr,final String brokerName,final long brokerId,
final String haServerAddr,final TopicConfigSerializeWrapper topicConfigWrapper,
final List<String> filterServerList,final boolean oneway,final int timeoutMills,
final boolean compressed)
final List<RegisterBrokerResult> registerBrokerResultList = Lists.newArrayList();
List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
if (nameServerAddressList != null && nameServerAddressList.size() > 0)
final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
requestHeader.setBrokerAddr(brokerAddr);
requestHeader.setBrokerId(brokerId);
requestHeader.setBrokerName(brokerName);
requestHeader.setClusterName(clusterName);
requestHeader.setHaServerAddr(haServerAddr);
requestHeader.setCompressed(compressed);
RegisterBrokerBody requestBody = new RegisterBrokerBody();
requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
requestBody.setFilterServerList(filterServerList);
final byte[] body = requestBody.encode(compressed);
final int bodyCrc32 = UtilAll.crc32(body);
requestHeader.setBodyCrc32(bodyCrc32);
final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
for (final String namesrvAddr : nameServerAddressList)
brokerOuterExecutor.execute(new Runnable()
@Override
public void run()
try
RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);54
.........
);
........
return registerBrokerResultList;
我们可以看到这里会将当前broker
的ip
、brokerName
、clusterName
、TopicConfigSerializeWrapper
等信息告诉namesrv
。同时我们可以看到for (final String namesrvAddr : nameServerAddressList)
,如果多个namesrv
的话,其都会进行注册。
2、namesrv逻辑
上面是broker
注册的逻辑,下面我们就来看下namesrv
的接收逻辑逻辑,其的处理逻辑主要是在DefaultRequestProcessor
类做分发:
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException
.......
switch (request.getCode())
case RequestCode.PUT_KV_CONFIG:
return this.putKVConfig(ctx, request);
case RequestCode.GET_KV_CONFIG:
return this.getKVConfig(ctx, request);
case RequestCode.DELETE_KV_CONFIG:
return this.deleteKVConfig(ctx, request);
case RequestCode.QUERY_DATA_VERSION:
return queryBrokerTopicConfig(ctx, request);
case RequestCode.REGISTER_BROKER:
Version brokerVersion = MQVersion.value2Version(request.getVersion());
if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal())
return this.registerBrokerWithFilterServer(ctx, request);
else
return this.registerBroker(ctx, request);
case RequestCode.UNREGISTER_BROKER:
return this.unregisterBroker(ctx, request);
case RequestCode.GET_ROUTEINTO_BY_TOPIC:
return this.getRouteInfoByTopic(ctx, request);
......
default:
break;
return null;
这里当前是走的registerBrokerWithFilterServer
方法:
public RemotingCommand registerBrokerWithFilterServer(ChannelHandlerContext ctx, RemotingCommand request)
.........
RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(
requestHeader.getClusterName(),
requestHeader.getBrokerAddr(),
requestHeader.getBrokerName(),
requestHeader.getBrokerId(),
requestHeader.getHaServerAddr(),
registerBrokerBody.getTopicConfigSerializeWrapper(),
registerBrokerBody.getFilterServerList(),
ctx.channel());
responseHeader.setHaServerAddr(result.getHaServerAddr());
responseHeader.setMasterAddr(result.getMasterAddr());
byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);
response.setBody(jsonValue);
response.setCode(ResponseCode.SUCCESS);
response.setRemark(null);
return response;
可以看到这里主要是将请求的信息交给RouteInfoManager
来处理,RouteInfoManager
主要是将这些信息保存到其的对应成员变量中,这些内容可以看前面写的一篇文章:RocketMQ源码解析-NameServer篇
三、consumer与broker的逻辑
我们知道broker
是消息实际保存落地的地方,所以,consumer
与producer
都是从namesrv
获取到broker
的对应信息,然后直接再去broker
推送、或者获取消息内容。
1、消费者demo
public static void main(String[] args) throws MQClientException
DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("cluster_consumer_queue_group");
defaultMQPushConsumer.setNamesrvAddr("127.0.0.1:9876");
defaultMQPushConsumer.setInstanceName("11");
String simpleTopic = "cluster_queue_topic2";
defaultMQPushConsumer.subscribe(simpleTopic,"*");
defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently()
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context)
for (MessageExt messageExt : msgs)
System.out.println("SimpleConsumer consumer Msg - " + JSONObject.toJSONString(messageExt));
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
);
defaultMQPushConsumer.start();
2、消费者TopicRouteData获取
这里就是一个消费者基本运行。在其启动后会进行很多的初始化内容,例如从namesrv
中获取对应的topic相关的队列信息,通过getTopicRouteInfoFromNameServer
方法:
public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
DefaultMQProducer defaultMQProducer)
try
if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS))
...........
else
topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
if (topicRouteData != null)
TopicRouteData old = this.topicRouteTable.get(topic);
...........
if (changed)
TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
for (BrokerData bd : topicRouteData.getBrokerDatas())
this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
.....
..........
this.topicRouteTable.put(topic, cloneTopicRouteData);
return true;
.......
return false;
这里主要就是从namesrv
中获取TopicRouteData
信息,我们知道broker
会将其的topic
信息注册、更新到namesrv
中。
public class TopicRouteData e以上是关于RocketMQ源码解析-Broker与Namesrv以及Consumer交互的主要内容,如果未能解决你的问题,请参考以下文章
RocketMQ源码解析-Broker与Namesrv以及Consumer交互