RocketMQ源码(11)—Broker asyncPutMessage处理消息以及存储的高性能设计一万字

Posted 刘Java

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ源码(11)—Broker asyncPutMessage处理消息以及存储的高性能设计一万字相关的知识,希望对你有一定的参考价值。

基于RocketMQ release-4.9.3,深入的介绍了Broker存储消息的源码,以及存储的高性能设计。

此前我们学习了RocketMQ源码(10)—Broker asyncSendMessage处理消息以及自动创建Topic的整体流程,从流程中我们知道asyncPutMessage方法真正的用来存储消息。现在我们来看看这个方法的源码。

文章目录

1 asyncPutMessage存储普通消息

普通消息的处理、存储入口方法是DefaultMessageStore#asyncPutMessage方法。

首先会调用checkStoreStatus、checkMessage、checkLmqMessage方法进行一系列的前置校验,如果通过了,则调用CommitLog#asyncPutMessage方法真正的存储消息,最后会更新耗费的时间或者失败次数。

/**
* DefaultMessageStore的方法
* <p>
* 处理、存储消息
*
* @param msg 需要存储的MessageInstance
*/
@Override
    public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner msg) 
    /*
* 1 检查存储状态
*/
    PutMessageStatus checkStoreStatus = this.checkStoreStatus();
//如果不是PUT_OK就直接返回了
if (checkStoreStatus != PutMessageStatus.PUT_OK) 
    return CompletableFuture.completedFuture(new PutMessageResult(checkStoreStatus, null));

/*
* 2 检查消息
*/
PutMessageStatus msgCheckStatus = this.checkMessage(msg);
if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) 
    return CompletableFuture.completedFuture(new PutMessageResult(msgCheckStatus, null));

/*
* 2 检查 light message queue(LMQ),即微消息队列
*/
PutMessageStatus lmqMsgCheckStatus = this.checkLmqMessage(msg);
if (msgCheckStatus == PutMessageStatus.LMQ_CONSUME_QUEUE_NUM_EXCEEDED) 
    return CompletableFuture.completedFuture(new PutMessageResult(lmqMsgCheckStatus, null));


//当前时间戳
long beginTime = this.getSystemClock().now();
/*
* 核心方法,调用CommitLog#asyncPutMessage方法存储消息
*/
CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg);

putResultFuture.thenAccept((result) -> 
    //存储消息消耗的时间
    long elapsedTime = this.getSystemClock().now() - beginTime;
    if (elapsedTime > 500) 
        log.warn("putMessage not in lock elapsed time(ms)=, bodyLength=", elapsedTime, msg.getBody().length);
    
    //更新统计保存消息花费的时间和最大花费的时间
    this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);

    if (null == result || !result.isOk()) 
        //如果存储失败在,则增加保存消息失败的次数
        this.storeStatsService.getPutMessageFailedTimes().add(1);
    
);

return putResultFuture;

1.1 checkStoreStatus检查存储状态

首先就会检查消息存储状态,看是否支持写入消息:

  1. 如果DefaultMessageStore是shutdown状态,返回SERVICE_NOT_AVAILABLE。
  2. 如果broker是SLAVE角色,则返回SERVICE_NOT_AVAILABLE,不能将消息写入SLAVE角色。
  3. 如果不支持写入,那么返回SERVICE_NOT_AVAILABLE,可能因为broker的磁盘已满、写入逻辑队列错误、写入索引文件错误等等原因。
  4. 如果操作系统页缓存繁忙,则返回OS_PAGECACHE_BUSY,如果broker持有锁的时间超过osPageCacheBusyTimeOutMills,则算作操作系统页缓存繁忙。
  5. 返回PUT_OK,表示可以存储消息。
/**
 * DefaultMessageStore的方法
 * <p>
 * 检查存储状态
 */
private PutMessageStatus checkStoreStatus() 
    //如果DefaultMessageStore是shutdown状态,返回SERVICE_NOT_AVAILABLE
    if (this.shutdown) 
        log.warn("message store has shutdown, so putMessage is forbidden");
        return PutMessageStatus.SERVICE_NOT_AVAILABLE;
    
    //如果broker是SLAVE角色,则返回SERVICE_NOT_AVAILABLE,不能将消息写入SLAVE角色
    if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole()) 
        long value = this.printTimes.getAndIncrement();
        if ((value % 50000) == 0) 
            log.warn("broke role is slave, so putMessage is forbidden");
        
        return PutMessageStatus.SERVICE_NOT_AVAILABLE;
    
    //如果不支持写入,那么返回SERVICE_NOT_AVAILABLE
    //可能因为broker的磁盘已满、写入逻辑队列错误、写入索引文件错误等等原因
    if (!this.runningFlags.isWriteable()) 
        long value = this.printTimes.getAndIncrement();
        if ((value % 50000) == 0) 
            log.warn("the message store is not writable. It may be caused by one of the following reasons: " +
                    "the broker's disk is full, write to logic queue error, write to index file error, etc");
        
        return PutMessageStatus.SERVICE_NOT_AVAILABLE;
     else 
        this.printTimes.set(0);
    
    //如果操作系统页缓存繁忙,则返回OS_PAGECACHE_BUSY
    //如果broker持有锁的时间超过osPageCacheBusyTimeOutMills,则算作操作系统页缓存繁忙
    if (this.isOSPageCacheBusy()) 
        return PutMessageStatus.OS_PAGECACHE_BUSY;
    
    //返回PUT_OK,表示可以存储消息
    return PutMessageStatus.PUT_OK;

1.2 checkMessage检查消息

检查消息,看是否符合要求:

  1. 如果topic长度大于127,则返回MESSAGE_ILLEGAL,表示topic过长了。
  2. 如果设置的属性长度大于32767,则返回MESSAGE_ILLEGAL,表示properties过长了。
  3. 否则,返回PUT_OK,表示检查通过。
/**
 * DefaultMessageStore的方法
 * <p>
 * 检查消息
 */
private PutMessageStatus checkMessage(MessageExtBrokerInner msg) 
    //如果topic长度大于127,则返回MESSAGE_ILLEGAL,表示topic过长了
    if (msg.getTopic().length() > Byte.MAX_VALUE) 
        log.warn("putMessage message topic length too long " + msg.getTopic().length());
        return PutMessageStatus.MESSAGE_ILLEGAL;
    
    //如果设置的属性长度大于32767,则返回MESSAGE_ILLEGAL,表示properties过长了
    if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE) 
        log.warn("putMessage message properties length too long " + msg.getPropertiesString().length());
        return PutMessageStatus.MESSAGE_ILLEGAL;
    
    return PutMessageStatus.PUT_OK;

2 CommitLog#asyncPutMessage异步存储消息

该方法中将会对消息进行真正的保存,即持久化操作,步骤比较繁杂,但同样属于RocketMQ源码的精髓,值得一看。其大概步骤为:

  1. 处理延迟消息的逻辑。
    1. 如果是延迟消息,即DelayTimeLevel大于0,那么替换topic为SCHEDULE_TOPIC_XXXX,替换queueId为延迟队列id, id = level - 1,如果延迟级别大于最大级别,则设置为最大级别18,,默认延迟2h。这些参数可以在broker端配置类MessageStoreConfig中配置。
    2. 最后保存真实topic到消息的REAL_TOPIC属性,保存queueId到消息的REAL_QID属性,方便后面恢复。
  2. 消息编码。获取线程本地变量,其内部包含一个线程独立的encoder和keyBuilder对象。将消息内容编码,存储到encoder内部的encoderBuffer中,它是通过ByteBuffer.allocateDirect(size)得到的一个直接缓冲区。消息写入之后,会调用encoderBuffer.flip()方法,将Buffer从写模式切换到读模式,可以读取到数据。
  3. 加锁并写入消息。
    1. 一个broker将所有的消息都追加到同一个逻辑CommitLog日志文件中,因此需要通过获取putMessageLock锁来控制并发。有两种锁,一种是ReentrantLock可重入锁,另一种spin则是CAS锁。根据StoreConfig的useReentrantLockWhenPutMessage决定是否使用可重入锁,默认为true,使用可重入锁。
    2. 从mappedFileQueue中的mappedFiles集合中获取最后一个MappedFile。如果最新mappedFile为null,或者mappedFile满了,那么会新建mappedFile。
    3. 通过mappedFile调用appendMessage方法追加消息,这里仅仅是追加消息到byteBuffer的内存中。如果是writeBuffer则表示消息写入了堆外内存中,如果是mappedByteBuffer,则表示消息写入了page chache中。总之,都是存储在内存之中。
    4. 追加成功之后解锁。如果是剩余空间不足,则会重新初始化一个MappedFile并再次尝试追加。
  4. 如果存在写满的MappedFile并且启用了文件内存预热,那么这里调用unlockMappedFile对MappedFile执行解锁。
  5. 更新消息统计信息。随后调用submitFlushRequest方法提交刷盘请求,将会根据刷盘策略进行刷盘。随后调用submitReplicaRequest方法提交副本请求,用于主从同步。
/**
     * CommitLog的方法
     * <p>
     * 异步存储消息
     *
     * @param msg
     * @return
     */
    public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) 
        // Set the storage time
        //设置存储时间
        msg.setStoreTimestamp(System.currentTimeMillis());
        // Set the message body BODY CRC (consider the most appropriate setting
        // on the client)
        //设置消息正文CRC
        msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
        // Back to Results
        AppendMessageResult result = null;

        StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();

        String topic = msg.getTopic();
//        int queueId msg.getQueueId();
        /*
         * 1 处理延迟消息的逻辑
         *
         * 替换topic和queueId,保存真实topic和queueId
         */
        //根据sysFlag获取事务状态,普通消息的sysFlag为0
        final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
        //如果不是事务消息,或者commit提交事务小i
        if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
                || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) 
            // Delay Delivery
            //获取延迟级别,判断是否是延迟消息
            if (msg.getDelayTimeLevel() > 0) 
                //如果延迟级别大于最大级别,则设置为最大级别
                if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) 
                    msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
                
                //获取延迟队列的topic,固定为 SCHEDULE_TOPIC_XXXX
                topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
                //根据延迟等级获取对应的延迟队列id, id = level - 1
                int queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

                // Backup real topic, queueId
                //使用扩展属性REAL_TOPIC 记录真实topic
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
                //使用扩展属性REAL_QID 记录真实queueId
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
                msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
                //更改topic和queueId为延迟队列的topic和queueId
                msg.setTopic(topic);
                msg.setQueueId(queueId);
            
        
        //发送消息的地址
        InetSocketAddress bornSocketAddress = (InetSocketAddress) msg.getBornHost();
        if (bornSocketAddress.getAddress() instanceof Inet6Address) 
            msg.setBornHostV6Flag();
        
        //存储消息的地址
        InetSocketAddress storeSocketAddress = (InetSocketAddress) msg.getStoreHost();
        if (storeSocketAddress.getAddress() instanceof Inet6Address) 
            msg.setStoreHostAddressV6Flag();
        
        /*
         * 2 消息编码
         */
        //获取线程本地变量,其内部包含一个线程独立的encoder和keyBuilder对象
        PutMessageThreadLocal putMessageThreadLocal = this.putMessageThreadLocal.get();
        //将消息内容编码,存储到encoder内部的encoderBuffer中,它是通过ByteBuffer.allocateDirect(size)得到的一个直接缓冲区
        //消息写入之后,会调用encoderBuffer.flip()方法,将Buffer从写模式切换到读模式,可以读取到数据
        PutMessageResult encodeResult = putMessageThreadLocal.getEncoder().encode(msg);
        if (encodeResult != null) 
            return CompletableFuture.completedFuture(encodeResult);
        
        //编码后的encoderBuffer暂时存入msg的encodedBuff中
        msg.setEncodedBuff(putMessageThreadLocal.getEncoder().encoderBuffer);
        //存储消息上下文
        PutMessageContext putMessageContext = new PutMessageContext(generateKey(putMessageThreadLocal.getKeyBuilder(), msg));
        /*
         * 3 加锁并写入消息
         * 一个broker将所有的消息都追加到同一个逻辑CommitLog日志文件中,因此需要通过获取putMessageLock锁来控制并发。
         */
        //持有锁的时间
        long elapsedTimeInLock = 0;
        MappedFile unlockMappedFile = null;
        /*
         * 有两种锁,一种是ReentrantLock可重入锁,另一种spin则是CAS锁
         * 根据StoreConfig的useReentrantLockWhenPutMessage决定是否使用可重入锁,默认为true,使用可重入锁。
         */
        putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
        try 
            /*
             * 从mappedFileQueue中的mappedFiles集合中获取最后一个MappedFile
             */
            MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
            //加锁后的起始时间
            long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
            this.beginTimeInLock = beginLockTimestamp;

            // Here settings are stored timestamp, in order to ensure an orderly
            // global
            //设置存储的时间戳为加锁后的起始时间,保证有序
            msg.setStoreTimestamp(beginLockTimestamp);
            /*
             * 如果最新mappedFile为null,或者mappedFile满了,那么会新建mappedFile并返回
             */
            if (null == mappedFile || mappedFile.isFull()) 
                mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
            
            if (null == mappedFile) 
                log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));
            
            /*
             *  追加存储消息
             */
            result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
            switch (result.getStatus()) 
                case PUT_OK:
                    break;
                case END_OF_FILE:
                    //文件剩余空间不足,那么初始化新的文件并尝试再次存储
                    unlockMappedFile = mappedFile;
                    // Create a new file, re-write the message
                    mappedFile = this.mappedFileQueue.getLastMappedFile(0);
                    if (null == mappedFile) 
                        // XXX: warn and notify me
                        log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                        return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result));
                    
                    result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
                    break;
                case MESSAGE_SIZE_EXCEEDED:
                case PROPERTIES_SIZE_EXCEEDED:
                    return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));
                case UNKNOWN_ERROR:
                    return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
                default:
                    return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
            
            //加锁的持续时间
            elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
         finally 
            //重置开始时间,释放锁
            beginTimeInLock = 0;
            putMessageLock.unlock();
        

        if (elapsedTimeInLock > 500) 
            log.warn("[NOTIFYME]putMessage in lock cost time(ms)=, bodyLength= AppendMessageResult=", elapsedTimeInLock, msg.getBody().length, result);
        
        //如果存在写满的MappedFile并且启用了文件内存预热,那么这里对MappedFile执行解锁
        if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) 
            this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
        

        PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);

        // Statistics
        //存储数据的统计信息更新
        storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).add(1);
        storeStatsService.getSinglePutMessageTopicSizeTotal(topic).add(result.getWroteBytes());
        /*
         * 4 提交刷盘请求,将会根据刷盘策略进行刷盘
         */
        CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);
        RocketMQ源码系列 broker启动流程源码解析

RocketMQ源码—Broker启动流程源码解析一万字

RocketMQ源码—Broker与NameServer的心跳服务源码

rocketmq源码解析-namesrv与broker

7RocketMQ 源码解析之 Broker 启动(下)

7RocketMQ 源码解析之 Broker 启动(下)