RocketMQ源码(14)—Broker CommitLogDispatcher 异步构建ConsumeQueue和IndexFile源码解析

Posted 刘Java

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ源码(14)—Broker CommitLogDispatcher 异步构建ConsumeQueue和IndexFile源码解析相关的知识,希望对你有一定的参考价值。

基于RocketMQ release-4.9.3,深入的介绍了Broker 消息重放服务ReputMessageService中基于 CommitLogDispatcher 异步构建ConsumeQueue和IndexFile的源码。

上一章我们学习了ReputMessageService消息重放服务的总体流程:RocketMQ源码(13)—Broker 消息重放服务ReputMessageService源码解析,这一篇文章我们将深入学习CommitLogDispatcherBuildConsumeQueue、CommitLogDispatcherBuildIndex到底是如何构建异步构建ConsumeQueue和IndexFile索引文件的。

文章目录

1 CommitLogDispatcherBuildConsumeQueue构建ConsumeQueue

CommitLogDispatcherBuildConsumeQueue用于接收分发请求并构建ConsumeQueue。

对于非事务消息或者是事务commit消息,则调用DefaultMessageStore#putMessagePositionInfo方法写入消息位置信息到consumeQueue,如果是事务prepared消息或者是事务rollback消息,则不进行处理。

class CommitLogDispatcherBuildConsumeQueue implements CommitLogDispatcher 
    /**
     * DefaultMessageStore的方法
     *
     * @param request 分派消息请求
     */
    @Override
    public void dispatch(DispatchRequest request) 
        //从该消息的消息系统flag中获取事务状态
        final int tranType = MessageSysFlag.getTransactionValue(request.getSysFlag());
        switch (tranType) 
            //如果不是事务消息或者是事务commit消息,则进行处理
            case MessageSysFlag.TRANSACTION_NOT_TYPE:
            case MessageSysFlag.TRANSACTION_COMMIT_TYPE:
                //写入消息位置信息到consumeQueue
                DefaultMessageStore.this.putMessagePositionInfo(request);
                break;
            //如果是事务prepared消息或者是事务rollback消息,则不进行处理
            case MessageSysFlag.TRANSACTION_PREPARED_TYPE:
            case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE:
                break;
        
    

1.1 putMessagePositionInfo写入消息位置信息

该方法首先调用findConsumeQueue方法根据topic和队列id确定需要写入的ConsumeQueue。然后调用ConsumeQueue#putMessagePositionInfoWrapper方法将消息信息追加到ConsumeQueue索引文件中。

/**
 * DefaultMessageStore的方法
 * 写入消息位置信息
 *
 * @param dispatchRequest 分派消息请求
 */
public void putMessagePositionInfo(DispatchRequest dispatchRequest) 
    /*
     * 根据topic和队列id确定ConsumeQueue
     */
    ConsumeQueue cq = this.findConsumeQueue(dispatchRequest.getTopic(), dispatchRequest.getQueueId());
    /*
     * 将消息信息追加到ConsumeQueue索引文件中
     */
    cq.putMessagePositionInfoWrapper(dispatchRequest, checkMultiDispatchQueue(dispatchRequest));

1.2 findConsumeQueue查找ConsumeQueue

该方法根据topic和队列id确定需要写入的ConsumeQueue,查找的目标就是consumeQueueTable缓存集合。还可以知道,ConsumeQueue文件是延迟创建的,即当需要到该ConsumeQueue的时候才会新建。

/**
 * DefaultMessageStore
 * <p>
 * 根据topic和队列id查找ConsumeQueue
 */
public ConsumeQueue findConsumeQueue(String topic, int queueId) 
    //从consumeQueueTable中获取该topic所有的队列
    ConcurrentMap<Integer, ConsumeQueue> map = consumeQueueTable.get(topic);
    //如果没有保存该topic的喜喜,那么存入一个空的map
    if (null == map) 
        ConcurrentMap<Integer, ConsumeQueue> newMap = new ConcurrentHashMap<Integer, ConsumeQueue>(128);
        ConcurrentMap<Integer, ConsumeQueue> oldMap = consumeQueueTable.putIfAbsent(topic, newMap);
        if (oldMap != null) 
            map = oldMap;
         else 
            map = newMap;
        
    
    // 从map中根据queueId 获取对应的 消费队列
    ConsumeQueue logic = map.get(queueId);
    //如果ConsumeQueue为null,那么新建,所以说ConsumeQueue是延迟创建的
    if (null == logic) 
        //新建ConsumeQueue
        ConsumeQueue newLogic = new ConsumeQueue(
                topic,
                queueId,
                StorePathConfigHelper.getStorePathConsumeQueue(this.messageStoreConfig.getStorePathRootDir()),
                //单个文件大小,默认为可存储30W数据的大小,每条数据20Byte
                this.getMessageStoreConfig().getMappedFileSizeConsumeQueue(),
                this);
        //存入map中,如果已存在则取旧的
        ConsumeQueue oldLogic = map.putIfAbsent(queueId, newLogic);
        if (oldLogic != null) 
            logic = oldLogic;
         else 
            // light message queue(LMQ)
            if (MixAll.isLmq(topic)) 
                lmqConsumeQueueNum.getAndIncrement();
            
            logic = newLogic;
        
    

    return logic;

1.2.1 创建ConsumeQueue

创建ConsumeQueue的构造器方法如下,将会初始化各种属性,然后会初始化20个字节的堆外内存,用于临时存储单个索引,这段内存可循环使用。

public ConsumeQueue(
        final String topic,
        final int queueId,
        final String storePath,
        final int mappedFileSize,
        final DefaultMessageStore defaultMessageStore) 
    //各种属性
    this.storePath = storePath;
    //单个文件大小,默认为可存储30W数据的大小,每条数据20Byte
    this.mappedFileSize = mappedFileSize;
    this.defaultMessageStore = defaultMessageStore;

    this.topic = topic;
    this.queueId = queueId;
    //queue的路径 $HOME/store/consumequeue/topic/queueId/fileName
    String queueDir = this.storePath
            + File.separator + topic
            + File.separator + queueId;
    //创建mappedFileQueue,内部保存在该queueId下面的所有的consumeQueue文件集合mappedFiles相当于一个文件夹
    this.mappedFileQueue = new MappedFileQueue(queueDir, mappedFileSize, null);
    //分配20个字节的堆外内存,用于临时存储单个索引,这段内存可循环使用
    this.byteBufferIndex = ByteBuffer.allocate(CQ_STORE_UNIT_SIZE);
    //是否启用消息队列的扩展存储,默认false
    if (defaultMessageStore.getMessageStoreConfig().isEnableConsumeQueueExt()) 
        this.consumeQueueExt = new ConsumeQueueExt(
                topic,
                queueId,
                StorePathConfigHelper.getStorePathConsumeQueueExt(defaultMessageStore.getMessageStoreConfig().getStorePathRootDir()),
                defaultMessageStore.getMessageStoreConfig().getMappedFileSizeConsumeQueueExt(),
                defaultMessageStore.getMessageStoreConfig().getBitMapLengthConsumeQueueExt()
        );
    

ConsumeQueue文件可以看成是基于topic的commitlog索引文件,故ConsumeQueue文件夹的组织方式如下:topic/queue/file三层组织结构,具体存储路径为:$HOME/store/consumequeue/topic/queueId/fileName。

例如topic名为TopicTest,并且有四个队列,则该topic的ConsumeQueue的组织方式为:

1.3 putMessagePositionInfoWrapper追加消息索引

该方法用于构建消息索引信息并且存入找到的ConsumeQueue文件中。支持重试,最大重试30次。

/**
 * ConsumeQueue的方法
 * <p>
 * 将消息信息追加到ConsumeQueue索引文件中
 */
public void putMessagePositionInfoWrapper(DispatchRequest request, boolean multiQueue) 
    //最大重试次数30
    final int maxRetries = 30;
    //检查ConsumeQueue文件是否可写
    boolean canWrite = this.defaultMessageStore.getRunningFlags().isCQWriteable();
    //如果文件可写,并且重试次数小于30次,那么写入ConsumeQueue索引
    for (int i = 0; i < maxRetries && canWrite; i++) 
        //获取tagCode
        long tagsCode = request.getTagsCode();
        //如果支持扩展信息写入,默认false
        if (isExtWriteEnable()) 
            ConsumeQueueExt.CqExtUnit cqExtUnit = new ConsumeQueueExt.CqExtUnit();
            cqExtUnit.setFilterBitMap(request.getBitMap());
            cqExtUnit.setMsgStoreTime(request.getStoreTimestamp());
            cqExtUnit.setTagsCode(request.getTagsCode());

            long extAddr = this.consumeQueueExt.put(cqExtUnit);
            if (isExtAddr(extAddr)) 
                tagsCode = extAddr;
             else 
                log.warn("Save consume queue extend fail, So just save tagsCode! , topic:, queueId:, offset:", cqExtUnit,
                        topic, queueId, request.getCommitLogOffset());
            
        
        /*
         * 写入消息位置信息到ConsumeQueue中
         */
        boolean result = this.putMessagePositionInfo(request.getCommitLogOffset(),
                request.getMsgSize(), tagsCode, request.getConsumeQueueOffset());
        if (result) 
            if (this.defaultMessageStore.getMessageStoreConfig().getBrokerRole() == BrokerRole.SLAVE ||
                    this.defaultMessageStore.getMessageStoreConfig().isEnableDLegerCommitLog()) 
                //修改StoreCheckpoint中的physicMsgTimestamp:最新commitlog文件的刷盘时间戳,单位毫秒
                this.defaultMessageStore.getStoreCheckpoint().setPhysicMsgTimestamp(request.getStoreTimestamp());
            
            this.defaultMessageStore.getStoreCheckpoint().setLogicsMsgTimestamp(request.getStoreTimestamp());
            if (multiQueue) 
                multiDispatchLmqQueue(request, maxRetries);
            
            return;
         else 
            // XXX: warn and notify me
            log.warn("[BUG]put commit log position info to " + topic + ":" + queueId + " " + request.getCommitLogOffset()
                    + " failed, retry " + i + " times");

            try 
                Thread.sleep(1000);
             catch (InterruptedException e) 
                log.warn("", e);
            
        
    

    // XXX: warn and notify me
    log.error("[BUG]consume queue can not write,  ", this.topic, this.queueId);
    this.defaultMessageStore.getRunningFlags().makeLogicsQueueError();

1.3.1 putMessagePositionInfo写入消息位置信息

该方法将消息位置信息写入到ConsumeQueue文件中。大概步骤为:

  1. 校验如果消息偏移量+消息大小 小于等于ConsumeQueue已处理的最大物理偏移量。说明该消息已经被写过了,直接返回true。
  2. 将消息信息offset、size、tagsCode按照顺序存入临时缓冲区byteBufferIndex中。
  3. 调用getLastMappedFile方法,根据偏移量获取将要写入的最新ConsumeQueue文件的MappedFile,可能会新建ConsumeQueue文件。getLastMappedFile方法的源码我们此前学过了。
  4. 进行一系列校验,例如是否需要重设索引信息,是否存在写入错误等等。
  5. 更新消息最大物理偏移量maxPhysicOffset = 消息在CommitLog中的物理偏移量 + 消息的大小。
  6. 调用MappedFile#appendMessage方法将临时缓冲区中的索引信息追加到mappedFile的mappedByteBuffer中,并且更新wrotePosition的位置信息,到此构建ComsumeQueue完毕。

从该方法中我们可以知道一条消息在ConsumeQueue中的一个索引条目的存储方式,固定为8B的offset+4B的size+8BtagsCode,固定占用20B。

  1. offset,消息在CommitLog中的物理偏移量。
  2. size,消息大小。
  3. tagsCode,延迟消息就是消息投递时间,其他消息就是消息的tags的hashCode。
/**
 * 写入消息位置信息到ConsumeQueue中
 *
 * @param offset   消息在CommitLog中的物理偏移量
 * @param size     消息大小
 * @param tagsCode 消息tagsCode,延迟消息就是消息投递时间,其他消息就是消息的tags的hashCode
 * @param cqOffset 消息在消息消费队列的偏移量
 */
private boolean putMessagePositionInfo(final long offset, final int size, final long tagsCode,
                                       final long cqOffset) 
    //如果消息偏移量+消息大小 小于等于ConsumeQueue已处理的最大物理偏移量
    //说明该消息已经被写过了,直接返回true
    if (offset + size <= this.maxPhysicOffset) 
        log.warn("Maybe try to build consume queue repeatedly maxPhysicOffset= phyOffset=", maxPhysicOffset, offset);
        return true;
    
    /*
     * 将消息信息offset、size、tagsCode按照顺序存入临时缓冲区byteBufferIndex中
     */
    //position指针移到缓冲区头部
    this.byteBufferIndex.flip();
    //缓冲区的限制20B
    this.byteBufferIndex.limit(CQ_STORE_UNIT_SIZE);
    //存入8个字节长度的offset,消息在CommitLog中的物理偏移量
    this.byteBufferIndex.putLong(offset);
    //存入4个字节长度的size,消息大小
    this.byteBufferIndex.putInt(size);
    //存入8个字节长度的tagsCode,延迟消息就是消息投递时间,其他消息就是消息的tags的hashCode
    this.byteBufferIndex.putLong(tagsCode);
    //已存在索引数据的最大预计偏移量
    final long expectLogicOffset = cqOffset * CQ_STORE_UNIT_SIZE;
    /*
     * 根据偏移量获取将要写入的最新ConsumeQueue文件的MappedFile,可能会新建ConsumeQueue文件
     */
    MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile(expectLogicOffset);
    if (mappedFile != null) 
        //如果mappedFile是第一个创建的消费队列,并且消息在消费队列的偏移量不为0,并且消费队列写入指针为0
        //那么表示消费索引数据错误,需要重设索引信息
        if (mappedFile.isFirstCreateInQueue() && cqOffset != 0 && mappedFile.getWrotePosition() == 0) 
            //设置最小偏移量为预计偏移量
            this.minLogicOffset = expectLogicOffset;
            //设置刷盘最新位置,提交的最新位置
            this.mappedFileQueue.setFlushedWhere(expectLogicOffset);
            this.mappedFileQueue.setCommittedWhere(expectLogicOffset);
            //对该ConsumeQueue文件expectLogicOffset之前的位置填充前导0
            this.fillPreBlank(mappedFile, expectLogicOffset);
            log.info("fill pre blank space " + mappedFile.getFileName() + " " + expectLogicOffset + " "
                    + mappedFile.getWrotePosition());
        
        //如果消息在消费队列的偏移量不为0,即此前有数据
        if (cqOffset != 0) 
            //获取当前ConsumeQueue文件最新已写入物理偏移量
            long currentLogicOffset = mappedFile.getWrotePosition() + mappedFile.getFileFromOffset();
            //最新已写入物理偏移量大于预期偏移量,那么表示重复构建消费队列
            if (expectLogicOffset < currentLogicOffset) 
                log.warn("Build  consume queue repeatedly, expectLogicOffset:  currentLogicOffset:  Topic:  QID:  Diff: ",
                        expectLogicOffset, currentLogicOffset, this.topic, this.queueId, expectLogicOffset - currentLogicOffset);
                return true;
            
            //如果不相等,表示存在写入错误,正常情况下,两个值应该相等,因为一个索引条目固定大小20B
            if (expectLogicOffset != currentLogicOffset) 
                LOG_ERROR.warn(
                        "[BUG]logic queue order maybe wrong, expectLogicOffset:  currentLogicOffset:  Topic:  QID:  Diff: ",
                        expectLogicOffset,
                        currentLogicOffset,
                        this.topic,
                        this.queueId,
                        expectLogicOffset - currentLogicOffset
                );
            
        
        //更新消息最大物理偏移量 = 消息在CommitLog中的物理偏移量 + 消息的大小
        this.maxPhysicOffset = offset + size;
        /*
         * 将临时缓冲区中的索引信息追加到mappedFile的mappedByteBuffer中,并且更新wrotePosition的位置信息,到此构建ComsumeQueue完毕
         */
        return mappedFile.appendMessage(this.byteBufferIndex.array());
    
    return false;

1.3.1.1 MappedFile#appendMessage追加消息

该方法用于将数据追加到MappedFile,这里仅仅是追加到对应的mappedByteBuffer中,基于mmap技术仅仅是将数据写入pageCache中,并没有立即刷盘,而是依靠操作系统判断刷盘,这样保证了写入的高性能。

/**
 * MappedFile的方法
 * <p>
 * 追加消息
 *
 * @param data 追加的数据
 */
public boolean appendMessage(final byte[] data) 
    //获取写入位置
    int currentPos = this.wrotePosition.get();
    //如果当前位置加上消息大小小于等于文件大小,那么将消息写入mappedByteBuffer
    if ((currentPos + data.length) <= this.fileSize) 
        try 
            //消息写入mappedByteBuffer即可,并没有执行刷盘
            ByteBuffer buf = this.mappedByteBuffer.slice();
            buf.position(currentPos);
            buf.put(data);
         catch (Throwable e) 
            log.error("Error occurred when append message to mappedFile.", e);
        
        //更新写入位置
        this.wrotePosition.addAndGet(data.length);
        return true;
    

    return false;

2 CommitLogDispatcherBuildIndex构建IndexFile

CommitLogDispatcherBuildIndex用于接收分发请求并构建IndexFile。

首先判断是否支持消息Index,默认是支持的,那么调用IndexService#buildIndex方法构建。如果不存在则不构建,因此Index文件是否存在都不影响RocketMQ的正常运行,它进被用来提升根据keys或者时间范围查询消息的效率。

/**
 * DefaultMessageStore的方法
 * 写入消息位置信息到IndexFile
 *
 * @param request 分派消息请求
 */
@Override
public void dispatch(DispatchRequest request) 
    //是否支持IndexFile,默认true
    if (DefaultMessageStore.this.messageStoreConfig.isMessageIndexEnable()) 
        //构建Index
        DefaultMessageStore.this.indexService.buildIndex(request);
    

2.1 buildIndex构建Index索引

该方法用于为一条消息构建Index索引,大概步骤为:

  1. 通过retryGetAndCreateIndexFile方法获取或创建最新索引文件IndexFile,支持重试最多3次
  2. 判断当前消息在commitlog中的偏移量小于该文件的结束索引在commitlog中的偏移量,那么表示已为该消息构建Index索引,直接返回。如果该消息是事务回滚消息,则同样直接返回,不需要创建索引。
  3. 获取客户端生成的uniqId,也被称为msgId,从逻辑上代表客户端生成的唯一一条消息,如果uniqId不为null,那么调用putKey方法为uniqId构建索引。
  4. 获取客户端传递的keys,如果keys不为空,那么调用putKey方法为keys中的每一个key构建索引。
/**
 * I

以上是关于RocketMQ源码(14)—Broker CommitLogDispatcher 异步构建ConsumeQueue和IndexFile源码解析的主要内容,如果未能解决你的问题,请参考以下文章

RocketMQ源码—Broker启动流程源码解析一万字

RocketMQ源码—Broker与NameServer的心跳服务源码

rocketmq源码解析-namesrv与broker

7RocketMQ 源码解析之 Broker 启动(下)

7RocketMQ 源码解析之 Broker 启动(下)

RocketMQ源码(10)—Broker asyncSendMessage处理消息以及自动创建Topic