RocketMQ4.9.1源码解析-(HA模块)主从Slave上报与消息处理

Posted Java小海.

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ4.9.1源码解析-(HA模块)主从Slave上报与消息处理相关的知识,希望对你有一定的参考价值。

接着上一篇 RocketMQv4.9.1源码分析-HA主从 Master读写处理 解答了Master相关的问题,这篇文章围绕Slave相关的一些问题继续看代码。

对于slave,我们有如下一些疑惑:

  1. slave如何获取master的路由信息
  2. slave如何向master报告offset
  3. slave如何处理master同步的数据

在整体的类图布局中,slave相关的代码都在HAClient类中

HAClient

HAClient部分是Slave处理的核心,其中包括三部分:

  1. slave与master建立连接
  2. slave向master汇报同步进度
  3. slave接收master的同步数据并处理

HAClient 启动

之前提到了,HAClient启动的触发时机是HAService启动的方法中,在store/src/main/java/org/apache/rocketmq/store/ha/HAService.start()中。

// HAService 启动
public void start() throws Exception 
    this.acceptSocketService.beginAccept();
    this.acceptSocketService.start();
    this.groupTransferService.start();
    this.haClient.start();

复制代码

HAClient的启动代码在store/src/main/java/org/apache/rocketmq/store/ha/HAService$HAClient.run()路径下。

其中3个核心的步骤在代码注释中有进行标记

@Override
public void run() 
    log.info(this.getServiceName() + " service started");

    while (!this.isStopped()) 
        try 
            // 步骤一:连接到master
            if (this.connectMaster()) 
                // 步骤二:如果距离上次报告的时间间隔到了最大等待时间,立刻执行一次报告
                if (this.isTimeToReportOffset()) 
                    // 报告slave offset
                    boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
                    if (!result) 
                        this.closeMaster();
                    
                
                // 检查是否有读事件
                this.selector.select(1000);
                //步骤三:处理master返回的消息
                boolean ok = this.processReadEvent();
                if (!ok) 
                    this.closeMaster();
                
                // 处理完读事件后,如果slave offset更新,需要再次发送新的slave offset
                if (!reportSlaveMaxOffsetPlus()) 
                    continue;
                

                long interval = HAService.this.getDefaultMessageStore().getSystemClock().now() - this.lastWriteTimestamp;
                if (interval > HAService.this.getDefaultMessageStore().getMessageStoreConfig().getHaHousekeepingInterval()) 
                    log.warn("HAClient, housekeeping, found this connection[" + this.masterAddress + "] expired, " + interval);
                    this.closeMaster();
                    log.warn("HAClient, master not response some time, so close connection");
                
             else 
                this.waitForRunning(1000 * 5);
            
         catch (Exception e) 
            log.warn(this.getServiceName() + " service has exception. ", e);
            this.waitForRunning(1000 * 5);
        
    

    log.info(this.getServiceName() + " service end");

复制代码

Slave连接Master

connectMaster()方法的目的是与master进行连接。

// master 地址(master的地址在配置文件中配置)
private final AtomicReference<String> masterAddress = new AtomicReference<>();

private boolean connectMaster() throws ClosedChannelException 
    if (null == socketChannel) 
        String addr = this.masterAddress.get();
        if (addr != null) 
            SocketAddress socketAddress = RemotingUtil.string2SocketAddress(addr);
            if (socketAddress != null) 
                this.socketChannel = RemotingUtil.connect(socketAddress);
                if (this.socketChannel != null) 
                    // 读事件,用于监听master的返回消息
                    this.socketChannel.register(this.selector, SelectionKey.OP_READ);
                
            
        
        // 设置为当前commitlog的偏移量
        this.currentReportedOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
        this.lastWriteTimestamp = System.currentTimeMillis();
    

    return this.socketChannel != null;

复制代码

有一个点关注下,就是currentReportedOffset字段,这个字段的目的是表示当前slave已同步的进度,后续在向master进行汇报时也是使用该字段的值。这里进行初始化的时候直接设置为commitlog文件的最大偏移量,如果没有commitlog文件则为0。

Slave汇报offset

// 步骤二:如果距离上次报告的时间间隔到了最大等待时间,立刻执行一次报告
if (this.isTimeToReportOffset()) 
    // 报告slave offset
    boolean result = this.reportSlaveMaxOffset(this.currentReportedOffset);
    if (!result) 
        this.closeMaster();
    

复制代码

isTimeToReportOffset()的作用是判断上一次进行汇报的时间与当前时间差是否大于最大等待间隔(默认5s),这意味着即使5s没有收到master的任何消息,slave也会发送一个汇报请求给master,作用相对于一个心跳包。

看下reportSlaveMaxOffset()方法:

private boolean reportSlaveMaxOffset(final long maxOffset) 
    // 写位置设为0
    this.reportOffset.position(0);
    // 可写长度为8字节
    this.reportOffset.limit(8);
    // 数据内容为slave当前的offset
    this.reportOffset.putLong(maxOffset);
    // 写模式切换到模式
    // 将读位置设为0
    this.reportOffset.position(0);
    // 可读长度为8字节
    this.reportOffset.limit(8);

    for (int i = 0; i < 3 && this.reportOffset.hasRemaining(); i++) 
        try 
            // 将数据写入channel中
            this.socketChannel.write(this.reportOffset);
         catch (IOException e) 
            log.error(this.getServiceName() + "reportSlaveMaxOffset this.socketChannel.write exception", e);
            return false;
        
    

    lastWriteTimestamp = HAService.this.defaultMessageStore.getSystemClock().now();
    return !this.reportOffset.hasRemaining();

复制代码

可以知道slave的汇报请求的数据包内容很简单,就是一个8字节大小的offset数据。

这里代码中有一个细节,rocketmq的作用再进行写模式切换到读模式时没有使用flip()方法,而是手动设置positionlimit,这是由于NIO是一个非阻塞IO,write方法不一定会一次将ByteBuffer的数据全部写入。

Slave处理同步数据

步骤三中,调用processReadEvent()对master返回的数据进行处理,看代码前,我们已经知道master返回的数据是未同步的消息,那么slave要做的事情是什么?当然就是把这个未同步的数据保存到本地的commitlog文件中。

private boolean processReadEvent() 
    // 连续读取到数据大小为0的次数
    int readSizeZeroTimes = 0;
    // 一直读取缓冲区的数据,直到没有剩余
    while (this.byteBufferRead.hasRemaining()) 
        try 
            int readSize = this.socketChannel.read(this.byteBufferRead);
            if (readSize > 0) 
                readSizeZeroTimes = 0;
                boolean result = this.dispatchReadRequest();
                if (!result) 
                    log.error("HAClient, dispatchReadRequest error");
                    return false;
                
             else if (readSize == 0) 
                // 如果连续三次读到为空,则跳出方法,这里的作用?
                if (++readSizeZeroTimes >= 3) 
                    break;
                
             else 
                log.info("HAClient, processReadEvent read socket < 0");
                return false;
            
         catch (IOException e) 
            log.info("HAClient, processReadEvent read socket exception", e);
            return false;
        
    

    return true;

复制代码

代码里调用了dispatchReadRequest()方法进行请求的处理,看下这个方法:

private boolean dispatchReadRequest() 
    final int msgHeaderSize = 8 + 4; // phyoffset + size

    while (true) 
        int diff = this.byteBufferRead.position() - this.dispatchPosition;
        if (diff >= msgHeaderSize) 
            // master commitlog 偏移量
            long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPosition);
            // 消息大小
            int bodySize = this.byteBufferRead.getInt(this.dispatchPosition + 8);
            // 本地  commitlog 偏移量
            long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
            if (slavePhyOffset != 0) 
                // 如果slave的offset和master的offset不一样,说明数据同步过程发生了问题,不再继续同步。
                if (slavePhyOffset != masterPhyOffset) 
                    log.error("master pushed offset not equal the max phy offset in slave, SLAVE: " + slavePhyOffset + " MASTER: " + masterPhyOffset);
                    return false;
                
            

            // 剩余空间够用,将消息追加到commitlog中
            if (diff >= (msgHeaderSize + bodySize)) 
                // 消息数组
                byte[] bodyData = byteBufferRead.array();
                // 消息的
                int dataStart = this.dispatchPosition + msgHeaderSize;
                // 添加数据到本地commitlog中
                HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData, dataStart, bodySize);
                this.dispatchPosition += msgHeaderSize + bodySize;

                if (!reportSlaveMaxOffsetPlus()) 
                    return false;
                

                continue;
            
        

        if (!this.byteBufferRead.hasRemaining()) 
            this.reallocateByteBuffer();
        

        break;
    

    return true;

复制代码

整体的逻辑可以分为俩部分,第一部分是解析请求包,获取消息数据,第二部分是把消息数据写入到commitlog文件中。

这两部分代码已经写的很清楚了,比较容易理解,就不再赘述。


 

RocketMQ源码解析-NameServer篇

在这一篇我们主要来看下NameServer是怎样保存topicbrokercluster这些信息的。

一、rocketmq-namesrv模块基本介绍

​ 可以看到在它的源码中,namesrv其只有这些类(当然也有使用rocketMQ源码的其他的模块)。NamesrvController完成netty连接的初始化,然后DefaultRequestProcessor负责完成客户端具体要做的操作的派发,而关于注册信息相关的保存以及获取就是RouteInfoManager

二、重要类介绍

1、DefaultRequestProcessor

​ 这个类就是对客户端例如broker注册、producer获取对应broker信息等具体操作的派发

@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx,
    RemotingCommand request) throws RemotingCommandException 
		...........
    switch (request.getCode()) 
        ..........
        case RequestCode.REGISTER_BROKER:
            Version brokerVersion = MQVersion.value2Version(request.getVersion());
            if (brokerVersion.ordinal() >= MQVersion.Version.V3_0_11.ordinal()) 
                return this.registerBrokerWithFilterServer(ctx, request);
             else 
                return this.registerBroker(ctx, request);
            
        case RequestCode.UNREGISTER_BROKER:
            return this.unregisterBroker(ctx, request);
        case RequestCode.GET_ROUTEINFO_BY_TOPIC:
            return this.getRouteInfoByTopic(ctx, request);
        case RequestCode.GET_BROKER_CLUSTER_INFO:
            return this.getBrokerClusterInfo(ctx, request);
        case RequestCode.WIPE_WRITE_PERM_OF_BROKER:
            return this.wipeWritePermOfBroker(ctx, request);
        case RequestCode.GET_ALL_TOPIC_LIST_FROM_NAMESERVER:
            return getAllTopicListFromNameserver(ctx, request);
        case RequestCode.DELETE_TOPIC_IN_NAMESRV:
            return deleteTopicInNamesrv(ctx, request);
        case RequestCode.GET_KVLIST_BY_NAMESPACE:
            return this.getKVListByNamespace(ctx, request);
        case RequestCode.GET_TOPICS_BY_CLUSTER:
            return this.getTopicsByCluster(ctx, request);
        case RequestCode.GET_SYSTEM_TOPIC_LIST_FROM_NS:
            return this.getSystemTopicListFromNs(ctx, request);
        case RequestCode.GET_UNIT_TOPIC_LIST:
            return this.getUnitTopicList(ctx, request);
        case RequestCode.GET_HAS_UNIT_SUB_TOPIC_LIST:
            return this.getHasUnitSubTopicList(ctx, request);
        case RequestCode.GET_HAS_UNIT_SUB_UNUNIT_TOPIC_LIST:
            return this.getHasUnitSubUnUnitTopicList(ctx, request);
        case RequestCode.UPDATE_NAMESRV_CONFIG:
            return this.updateConfig(ctx, request);
        case RequestCode.GET_NAMESRV_CONFIG:
            return this.getConfig(ctx, request);
        default:
            break;
    
    return null;

​ 这里就有如RequestCode.REGISTER_BROKER其就是处理broker的注册请求的、RequestCode.GET_TOPICS_BY_CLUSTER客户端获取namesrv保存的topic信息、客户端获取以及注册的broker信息GET_BROKER_CLUSTER_INFOGET_ROUTEINFO_BY_TOPIC获取Topic路由信息等。

1)、registerBroker

​ 这个方法就是处理broker注册请求的

public RemotingCommand registerBroker(ChannelHandlerContext ctx,
    RemotingCommand request) throws RemotingCommandException 
 		.........
    RegisterBrokerResult result = this.namesrvController.getRouteInfoManager().registerBroker(
        requestHeader.getClusterName(),
        requestHeader.getBrokerAddr(),
        requestHeader.getBrokerName(),
        requestHeader.getBrokerId(),
        requestHeader.getHaServerAddr(),
        topicConfigWrapper,
        null,
        ctx.channel()
    );

    responseHeader.setHaServerAddr(result.getHaServerAddr());
    responseHeader.setMasterAddr(result.getMasterAddr());

    byte[] jsonValue = this.namesrvController.getKvConfigManager().getKVListByNamespace(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG);
    response.setBody(jsonValue);
    response.setCode(ResponseCode.SUCCESS);
    response.setRemark(null);
    return response;

​ 这里就会交给RouteInfoManager来请求。

2)、getRouteInfoByTopic

public RemotingCommand getRouteInfoByTopic(ChannelHandlerContext ctx,
    RemotingCommand request) throws RemotingCommandException 
    final RemotingCommand response = RemotingCommand.createResponseCommand(null);
    final GetRouteInfoRequestHeader requestHeader =
        (GetRouteInfoRequestHeader) request.decodeCommandCustomHeader(GetRouteInfoRequestHeader.class);

    TopicRouteData topicRouteData = this.namesrvController.getRouteInfoManager().pickupTopicRouteData(requestHeader.getTopic());

    if (topicRouteData != null) 
        if (this.namesrvController.getNamesrvConfig().isOrderMessageEnable()) 
            String orderTopicConf =
                this.namesrvController.getKvConfigManager().getKVConfig(NamesrvUtil.NAMESPACE_ORDER_TOPIC_CONFIG,
                    requestHeader.getTopic());
            topicRouteData.setOrderTopicConf(orderTopicConf);
        

        byte[] content = topicRouteData.encode();
        response.setBody(content);
        response.setCode(ResponseCode.SUCCESS);
        response.setRemark(null);
        return response;
    

    response.setCode(ResponseCode.TOPIC_NOT_EXIST);
    response.setRemark("No topic route info in name server for the topic: " + requestHeader.getTopic()
        + FAQUrl.suggestTodo(FAQUrl.APPLY_TOPIC_URL));
    return response;

​ 获取TopicRouteData路由信息也是交给RouteInfoManager。下面我们就来具体看下RouteInfoManager

2、RouteInfoManager

public class RouteInfoManager 
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.NAMESRV_LOGGER_NAME);
    private final static long BROKER_CHANNEL_EXPIRED_TIME = 1000 * 60 * 2;
    private final ReadWriteLock lock = new ReentrantReadWriteLock();
    private final HashMap<String/* topic */, List<QueueData>> topicQueueTable;
    private final HashMap<String/* brokerName */, BrokerData> brokerAddrTable;
    private final HashMap<String/* clusterName */, Set<String/* brokerName */>> clusterAddrTable;
    private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
    private final HashMap<String/* brokerAddr */, List<String>/* Filter Server */> filterServerTable;

这个类我们主要是要知道这几个成员变量保存的值就可以了,因为关于broker注册、Topic路由信息都是获取这些变量中的值。

1)、前置介绍

​ 在了解这些变量的时候,我有配置3台机的集群关系,建立了两个broker集群:DefaultClusterDefaultCluster-2

DefaultCluster由双主双从构成,其主要构成DefaultCluster集群,broker-xx.properties配置的部分内容

192.168.127.128:broker-a

brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=0

192.168.127.128:broker-b-s

brokerClusterName=DefaultCluster
brokerName=broker-b
brokerId=1

192.168.127.129:broker-a-s

brokerClusterName=DefaultCluster
brokerName=broker-a
brokerId=1

192.168.127.129:broker-b

brokerClusterName=DefaultCluster
brokerName=broker-b
brokerId=0

另一个集群我用的一台机192.168.127.130正常不会怎样做(而且我发送消息的时候,发送的消息会在两个broker集群上都会有,要实现集群发消息隔离,应该需要不同集群分别注册到不同的nameServer上)

192.168.127.129:broker-c

brokerClusterName=DefaultCluster-2
brokerName=broker-c
brokerId=0

192.168.127.129:broker-a-s

brokerClusterName=DefaultCluster-2
brokerName=broker-c-s
brokerId=1

​ 下面我们就来看下其具体保存的信息

2)、topicQueueTable(HashMap<String, List<QueueData>>)

public class QueueData implements Comparable<QueueData> 
    private String brokerName;
    private int readQueueNums;
    private int writeQueueNums;
    private int perm;
    private int topicSysFlag;

topicQueueTable就是保存topicbroker的对应关系,例如cluster_queue_topic,这个topic在我们搭建的集群中,其每台broker默认是4个队列,然后发送的消息会分布在3台broker的节点上面–broker-abroker-bbroker-c,也就是说一个topic会有3*4个MessageQueue,例如在producer发送消息的时候,就会在12个MessageQueue中选一个:

​ 同时这个topic的路由信息:

​ 所以我们就可以通过topicQueueTable来知道topic发布在哪些broker上,以及整个topicqueue分布情况。

3)、brokerAddrTable(HashMap<String, BrokerData> )

public class BrokerData implements Comparable<BrokerData> 
    private String cluster;
    private String brokerName;
    private HashMap<Long/* brokerId */, String/* broker address */> brokerAddrs;

​ 这个就是保存brokerName对应的broker启动的实例的主从实例的分布情况:

例如broker-b所属的集群是DefaultCluster,其有两个实例其中一个主节点(breokerId=0)在192.168.127.129,有一个从节点,这个从节点(brokerId>0)在192.168.127.128

4)、clusterAddrTable(HashMap<String, Set<String>>)

​ 这个就是保存集群有哪些brokerName

5)、brokerLiveTable(HashMap<String, BrokerLiveInfo>)

​ 这个主要是用来维护上次心跳的维持时间

private final HashMap<String/* brokerAddr */, BrokerLiveInfo> brokerLiveTable;
class BrokerLiveInfo 
    private long lastUpdateTimestamp;
    private DataVersion dataVersion;
    private Channel channel;
    private String haServerAddr;

6)、registerBroker(REGISTER_BROKER)

public RegisterBrokerResult registerBroker(final String clusterName,final String brokerAddr,final String brokerName,final long brokerId,final String haServerAddr,final TopicConfigSerializeWrapper topicConfigWrapper,final List<String> filterServerList,final Channel channel) 
    RegisterBrokerResult result = new RegisterBrokerResult();
    try 
        try 
            this.lock.writeLock().lockInterruptibly();
            Set<String> brokerNames = this.clusterAddrTable.get(clusterName);
            if (null == brokerNames) 
                brokerNames = new HashSet<String>();
                this.clusterAddrTable.put(clusterName, brokerNames);
            
            brokerNames.add(brokerName);

            boolean registerFirst = false;

            BrokerData brokerData = this.brokerAddrTable.get(brokerName);
            if (null == brokerData) 
                registerFirst = true;
                brokerData = new BrokerData(clusterName, brokerName, new HashMap<Long, String>());
                this.brokerAddrTable.put(brokerName, brokerData);
            
        		..........
            String oldAddr = brokerData.getBrokerAddrs().put(brokerId, brokerAddr);
            registerFirst = registerFirst || (null == oldAddr);

            if (null != topicConfigWrapper
                && MixAll.MASTER_ID == brokerId) 
                if (this.isBrokerTopicConfigChanged(brokerAddr, topicConfigWrapper.getDataVersion())
                    || registerFirst) 
                    ConcurrentMap<String, TopicConfig> tcTable =
                        topicConfigWrapper.getTopicConfigTable();
                    if (tcTable != null) 
                        for (Map.Entry<String, TopicConfig> entry : tcTable.entrySet()) 
                            this.createAndUpdateQueueData(brokerName, entry.getValue());
                        
                    
                
            
            BrokerLiveInfo prevBrokerLiveInfo = this.brokerLiveTable.put(brokerAddr,
                new BrokerLiveInfo(
                    System.currentTimeMillis(),
                    topicConfigWrapper.getDataVersion(),
                    channel,
                    haServerAddr));
			.........
         finally 
            this.lock.writeLock().unlock();
        
     catch (Exception e) 
        log.error("registerBroker Exception", e);
    
    return result;

private final ReadWriteLock lock = new ReentrantReadWriteLock();
private void createAndUpdateQueueData(final String brokerName, final TopicConfig topicConfig) 
    QueueData queueData = new QueueData();
    queueData.setBrokerName(brokerName);
    queueData.setWriteQueueNums(topicConfig.getWriteQueueNums());
    queueData.setReadQueueNums(topicConfig.getReadQueueNums());
    queueData.setPerm(topicConfig.getPerm());
    queueData.setTopicSysFlag(topicConfig.getTopicSysFlag());

    List<QueueData> queueDataList = this.topicQueueTable.get(topicConfig.getTopicName());
    if (null == queueDataList) 
        queueDataList = new LinkedList<QueueData>();
        queueDataList.add(queueData);
        this.topicQueueTable.put(topicConfig.getTopicName(), queueDataList);
        log.info("new topic registered,  ", topicConfig.getTopicName(), queueData);
     else 
      ..........
    

​ 这里更新的时候首先是上锁,然后就是将客户端实例broker注册的信息构建保存到上面的那些变量中。

7)、pickupTopicRouteData(GET_ROUTEINFO_BY_TOPIC)

public TopicRouteData pickupTopicRouteData(final String topic) 
    TopicRouteData topicRouteData = new TopicRouteData();
    boolean foundQueueData = false;
    boolean foundBrokerData = false;
    Set<String> brokerNameSet = new HashSet<String>();
    List<BrokerData> brokerDataList = new LinkedList<BrokerData>();
    topicRouteData

以上是关于RocketMQ4.9.1源码解析-(HA模块)主从Slave上报与消息处理的主要内容,如果未能解决你的问题,请参考以下文章

《Elasticsearch 源码解析与优化实战》第13章:Snapshot 模块分析

《Elasticsearch 源码解析与优化实战》第13章:Snapshot 模块分析

HanLP用户自定义词典源码分析

第29课:Master HA彻底解密

keepalived

Master HA彻底解密(DT大数据梦工厂)