rocketmq源码解析-namesrv与broker

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了rocketmq源码解析-namesrv与broker相关的知识,希望对你有一定的参考价值。

参考技术A rocketmq是阿里巴巴开源的mq,目前在github拥有13+k的star。rocketmq是众多mq实现中,较少使用java实现的,因此对于java技术栈的人来说,拿rocketmq的源码作为切入点,理解mq的实现原理是非常合适的。
本文会从四大部分(namesrv、broker、producer、consumer)讲解rocketmq源码,之间的关系可见rocketmq架构图。

namesrv是类似zk的命名服务端,broker向它发起注册、producer与consumer向他拉取topic的队列。为什么不用现有的比如zk等中间件呢?应该是因为解耦:功能比较简单不需要引入外界中间件,避免引入新的复杂度,控制权在自己手上,简单即是美。

来到NamesrvStartup,里面定义里main方法,启动时会对NamesrvController进行创建,然后调用start方法对其进行启动,而启动过程是先initialize再start,打扫干净再迎客。

先看initialize,此方法主要对kvConfigManager进行加载、初始化远程服务器并注册处理器、初始化两个定时任务(扫描不活跃的broker、打印周期日志)。

再看start,主要是启动远程服务器,对本地端口进行绑定。

那broker怎么么向它发起注册?producer与consumer怎么向他拉取topic的队列?由上文可知,在初始化方法已经向远程服务器注册了处理器DefaultRequestProcessor,当请求进来时会流向processRequest方法。

我们挑选一些核心的请求处理进行解析:
1.REGISTER_BROKER
大于等于V3_0_11版本由registerBrokerWithFilterServer处理,主要是调用RouteInfoManager的registerBroker进行注册。

来到registerBroker,主要是对五个map存放broker相关信息,
clusterAddrTable存放的是clusterName与brokerName的对应关系。
brokerAddrTable存放的是brokerName与brokerAddr的对应关系。
brokerLiveTable存放的是brokerAddr与brokerLiveInfo的对应关系。
filterServerTable存放的是brokerAddr与filterServerList的对应关系。
topicQueueTable存放的是topicName与queueDataList的对应关系。

2.GET_ROUTEINFO_BY_TOPIC
调用getRouteInfoByTopic,继续通过routeInfoManager的pickupTopicRouteData方法获取topicRouteData。

来到pickupTopicRouteData,实际上就是通过topicQueueTable获取到队列信息,然后根据brokerName从brokerAddrTable获取到brokerData,最后根据brokerAddr获取到filterServerList,进行返回。

来到broker的BrokerStartup,先创建brokerController在调用start启动broker。

先看createBrokerController,里面对配置进行处理,然后创建BrokerController并进行initialize。

initialize方法有点长,主要是对manager(topicConfigManager、consumerOffsetManager等)的加载,messageStore的初始化,线程池的生成(请求处理、心跳、落盘、日志等),生成远程服务器并且注册处理器。

而start会调用controller的start进行处理,其实就是调用各个start方法和向namesrv注册。

还记得注册的处理器吗?我们看下主要的处理器源码。

RocketMQ源码解析-Broker与Namesrv以及Consumer交互

​ 这一篇我们主要来分析下Broker里面的部分逻辑–Broker主要负责消息的存储、投递和查询以及服务高可用保证。下面我们就来大体梳理下其关于消息的投递、存储、拉取相关的一些逻辑。

​ 我们知道broker实例时需要注册到namesrv中,然后其他的生产者、消费者再从namesrv中获取到对应的Broker实例信息,例如broker保存了哪些topictopic对应的队列信息、此borker对应的ip信息等。

一、Broker的启动初始化

​ 下面我们来看下broker启动后的初始化加载:

​ 在这里初始化的时候,主要有两个信息的初始化加载,也就是topic信息、队列消费的offset信息,主要是加载持久化的文件信息。

1、topic信息文件

1)、topic的加载

this.topicConfigManager.load();

​ 这里主要就是加载store\\config下面的topics.json文件信息

public boolean load() 
    String fileName = null;
    try 
        fileName = this.configFilePath();
        String jsonString = MixAll.file2String(fileName);

        if (null == jsonString || jsonString.length() == 0) 
            return this.loadBak();
         else 
            this.decode(jsonString);
            log.info("load " + fileName + " OK");
            return true;
        
     catch (Exception e) 
        log.error("load " + fileName + " failed, and try to load backup file", e);
        return this.loadBak();
    

​ 如果找不到,就会再去加载部分文件。

public void decode(String jsonString) 
    if (jsonString != null) 
        TopicConfigSerializeWrapper topicConfigSerializeWrapper =
            TopicConfigSerializeWrapper.fromJson(jsonString, TopicConfigSerializeWrapper.class);
        if (topicConfigSerializeWrapper != null) 
            this.topicConfigTable.putAll(topicConfigSerializeWrapper.getTopicConfigTable());
            this.dataVersion.assignNewOne(topicConfigSerializeWrapper.getDataVersion());
            this.printLoadDataWhenFirstBoot(topicConfigSerializeWrapper);
        
    

​ 加载后就会将json转换为TopicConfigSerializeWrapper,再其放到topicConfigTable中。

public class TopicConfigSerializeWrapper extends RemotingSerializable 
    private ConcurrentMap<String, TopicConfig> topicConfigTable =
        new ConcurrentHashMap<String, TopicConfig>();
    private DataVersion dataVersion = new DataVersion();

	"dataVersion":
		"counter":5,
		"timestamp":1645867253838
	,
	"topicConfigTable":
		"cluster_queue_topic2":
			"order":false,
			"perm":6,
			"readQueueNums":4,
			"topicFilterType":"SINGLE_TAG",
			"topicName":"cluster_queue_topic2",
			"topicSysFlag":0,
			"writeQueueNums":4
		
	

​ 这上面就是topics.json文件中的信息(已删除其他的topic信息),可以看到其保存的当前topic的权限、读取的队列数量等。

2)、topic的添加

​ 上面是加载文件,当我们知道这个文件在创建topic或者删除的时候是需要更新的,例如创建topic的时候。

public TopicConfig createTopicInSendMessageMethod(final String topic, final String defaultTopic,
    final String remoteAddress, final int clientDefaultTopicQueueNums, final int topicSysFlag) 
    TopicConfig topicConfig = null;
    boolean createNew = false;
    try 
        if (this.lockTopicConfigTable.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) 
            try 
					........
                if (topicConfig != null) 
                    log.info("Create new topic by default topic:[] config:[] producer:[]",
                        defaultTopic, topicConfig, remoteAddress);
                    this.topicConfigTable.put(topic, topicConfig);
                    this.dataVersion.nextVersion();
                    createNew = true;
                    this.persist();
                
             finally 
                this.lockTopicConfigTable.unlock();
            
        
     catch (InterruptedException e) 
        log.error("createTopicInSendMessageMethod exception", e);
    

    if (createNew) 
        this.brokerController.registerBrokerAll(false, true,true);
    

    return topicConfig;

​ 例如新建topic后会将其添加到topicConfigTable中,然后再将其this.persist()添加到文件中,这个同样是ConfigManager的公共方法。同时如果这个topic是新建的,会再通过registerBrokerAll方法将其再更新到namesrv中,这个方法也是我们等下要梳理的。

2、消费的队列信息文件

消费队列是加载的store\\config下面的consumerOffset.json文件信息

加载逻辑与后面的topic加载类似,都是同一个父类ConfigManager


	"offsetTable":
		"%RETRY%cluster_consumer_queue_group@cluster_consumer_queue_group":0:0
		,
		"cluster_queue_topic2@cluster_consumer_queue_group":0:102,1:102,2:85,3:84
		,
		"%RETRY%simple_batch_consumer_group@simple_batch_consumer_group":0:0
		
	

​ 这个文件主要是记录topic对应的queue队列的offset消费位置。例如例如上面的

cluster_queue_topic2@cluster_consumer_queue_group":0:102,1:102,2:85,3:84

就是记录了消费组cluster_consumer_queue_group对应的topic-cluster_queue_topic2下面的4个队列消费的offset位置。

二、Broker的注册

1、broker的逻辑

broker启动后,会将其的信息注册到namesrv中。其是在broker的启动方法的调用类BrokerController

public void start() throws Exception 
    if (this.messageStore != null) 
        this.messageStore.start();
    

    if (this.remotingServer != null) 
        this.remotingServer.start();
    
	.......
    if (this.brokerOuterAPI != null) 
        this.brokerOuterAPI.start();
    
		.......
    this.registerBrokerAll(true, false, true);
		........

public List<RegisterBrokerResult> registerBrokerAll(
    final String clusterName,final String brokerAddr,final String brokerName,final long brokerId,
    final String haServerAddr,final TopicConfigSerializeWrapper topicConfigWrapper,
    final List<String> filterServerList,final boolean oneway,final int timeoutMills,
    final boolean compressed) 
    final List<RegisterBrokerResult> registerBrokerResultList = Lists.newArrayList();
    List<String> nameServerAddressList = this.remotingClient.getNameServerAddressList();
    if (nameServerAddressList != null && nameServerAddressList.size() > 0) 

        final RegisterBrokerRequestHeader requestHeader = new RegisterBrokerRequestHeader();
        requestHeader.setBrokerAddr(brokerAddr);
        requestHeader.setBrokerId(brokerId);
        requestHeader.setBrokerName(brokerName);
        requestHeader.setClusterName(clusterName);
        requestHeader.setHaServerAddr(haServerAddr);
        requestHeader.setCompressed(compressed);

        RegisterBrokerBody requestBody = new RegisterBrokerBody();
        requestBody.setTopicConfigSerializeWrapper(topicConfigWrapper);
        requestBody.setFilterServerList(filterServerList);
        final byte[] body = requestBody.encode(compressed);
        final int bodyCrc32 = UtilAll.crc32(body);
        requestHeader.setBodyCrc32(bodyCrc32);
        final CountDownLatch countDownLatch = new CountDownLatch(nameServerAddressList.size());
        for (final String namesrvAddr : nameServerAddressList) 
            brokerOuterExecutor.execute(new Runnable() 
                @Override
                public void run() 
                    try 
                        RegisterBrokerResult result = registerBroker(namesrvAddr,oneway, timeoutMills,requestHeader,body);54
                        .........
                
            );
        
		........
    return registerBrokerResultList;

​ 我们可以看到这里会将当前brokeripbrokerNameclusterNameTopicConfigSerializeWrapper等信息告诉namesrv。同时我们可以看到for (final String namesrvAddr : nameServerAddressList),如果多个namesrv的话,其都会进行注册。

2、namesrv逻辑

上面是broker注册的逻辑,下面我们就来看下namesrv的接收逻辑逻辑,其的处理逻辑主要是在DefaultRequestProcessor类做分发:

@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
    RemotingCommand request) throws RemotingCommandException 
		.......
    switch (request.getCode()) 
        case RequestCode.PUT_KV_CONFIG:
            return this.putKVConfig(ctx, request);
        case RequestCode.GET_KV_CONFIG:
            return this.getKVConfig(ctx, request);
        case RequestCode.DELETE_KV_CONFIG:
            return this.deleteKVConfig(ctx, request);
        case RequestCode.QUERY_DATA_VERSION:
            return queryBrokerTopicConfig(ctx, request);
        case RequestCode.REGISTER_BROKER:
            Version brokerVersion = MQVersion.value2Version(request.getVersion());
            if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) 
                return this.registerBrokerWithFilterServer(ctx, request);
             else 
                return this.registerBroker(ctx, request);
            
        case RequestCode.UNREGISTER_BROKER:
            return this.unregisterBroker(ctx, request);
        case RequestCode.GET_ROUTEINTO_BY_TOPIC:
            return this.getRouteInfoByTopic(ctx, request);
       		......
        default:
            break;
    
    return null;

​ 这里当前是走的registerBrokerWithFilterServer方法:

public RemotingCommand registerBrokerWithFilterServer(ChannelHandlerContext ctx, RemotingCommand request)
  		.........
    RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(
        requestHeader.getClusterName(),
        requestHeader.getBrokerAddr(),
        requestHeader.getBrokerName(),
        requestHeader.getBrokerId(),
        requestHeader.getHaServerAddr(),
        registerBrokerBody.getTopicConfigSerializeWrapper(),
        registerBrokerBody.getFilterServerList(),
        ctx.channel());

    responseHeader.setHaServerAddr(result.getHaServerAddr());
    responseHeader.setMasterAddr(result.getMasterAddr());

    byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);
    response.setBody(jsonValue);

    response.setCode(ResponseCode.SUCCESS);
    response.setRemark(null);
    return response;

​ 可以看到这里主要是将请求的信息交给RouteInfoManager来处理,RouteInfoManager主要是将这些信息保存到其的对应成员变量中,这些内容可以看前面写的一篇文章:RocketMQ源码解析-NameServer篇

三、consumer与broker的逻辑

​ 我们知道broker是消息实际保存落地的地方,所以,consumerproducer都是从namesrv获取到broker的对应信息,然后直接再去broker推送、或者获取消息内容。

1、消费者demo

public static void main(String[] args) throws MQClientException 

    DefaultMQPushConsumer defaultMQPushConsumer = new DefaultMQPushConsumer("cluster_consumer_queue_group");
    defaultMQPushConsumer.setNamesrvAddr("127.0.0.1:9876");
    defaultMQPushConsumer.setInstanceName("11");
    String simpleTopic = "cluster_queue_topic2";
    defaultMQPushConsumer.subscribe(simpleTopic,"*");
    defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    defaultMQPushConsumer.registerMessageListener(new MessageListenerConcurrently() 
        @Override
        public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) 
            for (MessageExt messageExt : msgs) 
                System.out.println("SimpleConsumer consumer Msg - " + JSONObject.toJSONString(messageExt));
            
            return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
        
    );
    defaultMQPushConsumer.start();

2、消费者TopicRouteData获取

​ 这里就是一个消费者基本运行。在其启动后会进行很多的初始化内容,例如从namesrv中获取对应的topic相关的队列信息,通过getTopicRouteInfoFromNameServer方法:

public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
    DefaultMQProducer defaultMQProducer) 
    try 
        if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) 
                   ...........
                 else 
                    topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, 1000 * 3);
                
                if (topicRouteData != null) 
                    TopicRouteData old = this.topicRouteTable.get(topic);
              		...........
                    if (changed) 
                        TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
                        for (BrokerData bd : topicRouteData.getBrokerDatas()) 
                            this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
                        
                        .....
                    		..........
                        this.topicRouteTable.put(topic, cloneTopicRouteData);
                        return true;
                    
                 
       		.......
    return false;

这里主要就是从namesrv中获取TopicRouteData信息,我们知道broker会将其的topic信息注册、更新到namesrv中。

public class TopicRouteData e

以上是关于rocketmq源码解析-namesrv与broker的主要内容,如果未能解决你的问题,请参考以下文章

RocketMQ源码解析-Broker与Namesrv以及Consumer交互

RocketMQ-Namesrv源码解析

RocketMQ-Namesrv源码解析

rocketmq源码解析之NamesrvController创建

RocketMQ 源码分析 —— 高可用

RocketMQ源码解析-NameServer篇