RocketMQ Broker消息处理流程剩余源码解析

Posted 小王曾是少年

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ Broker消息处理流程剩余源码解析相关的知识,希望对你有一定的参考价值。

🍊 Java学习:Java从入门到精通总结

🍊 深入浅出RocketMQ设计思想:深入浅出RocketMQ设计思想

🍊 绝对不一样的职场干货:大厂最佳实践经验指南


📆 最近更新:2023年3月4日

🍊 个人简介:通信工程本硕 for NJU💪、Java程序员🌕。做过科研paper,发过专利,优秀的程序员不应该只是CRUD

🍊 点赞 👍 收藏 ⭐留言 📝 都是我最大的动力!


文章目录

DefaultMessageStore

接上文,SendMessageProcessor对象接收到消息之后,会把消息变成存储对象DefaultStoreMessage实例:

DefaultMessageStore的默认存储消息的方法asyncPutMessage如下:

@Override
public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner msg) 
    PutMessageStatus checkStoreStatus = this.checkStoreStatus();
    if (checkStoreStatus != PutMessageStatus.PUT_OK) 
        return CompletableFuture.completedFuture(new PutMessageResult(checkStoreStatus, null));
    

    PutMessageStatus msgCheckStatus = this.checkMessage(msg);
    if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL) 
        return CompletableFuture.completedFuture(new PutMessageResult(msgCheckStatus, null));
    

    long beginTime = this.getSystemClock().now();
    CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg);

    putResultFuture.thenAccept((result) -> 
        long elapsedTime = this.getSystemClock().now() - beginTime;
        if (elapsedTime > 500) 
            log.warn("putMessage not in lock elapsed time(ms)=, bodyLength=", elapsedTime, msg.getBody().length);
        
        this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);

        if (null == result || !result.isOk()) 
            this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
        
    );

    return putResultFuture;

整个存储流程的代码还是比较清晰的:

  1. 先判断消息是否投放成功 及 检查消息的格式
  2. 上述检查都没问题时就会把消息存储在CommitLog
CompletableFuture<PutMessageResult> putResultFuturethis.commitLog.asyncPutMessage(msg);

CommitLog

Broker接收到消息后,最终消息存储在Commitlog对象中,调用的是CommitLogputMessage方法

 public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg) 
        // Set the storage time
        msg.setStoreTimestamp(System.currentTimeMillis());
        // Set the message body BODY CRC (consider the most appropriate setting
        // on the client)
        msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
        // Back to Results
        AppendMessageResult result = null;

        StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();

        String topic = msg.getTopic();
        int queueId = msg.getQueueId();
		
		// 延时消息处理,事务为TRANSACTION_PREPARED_TYPE 和 TRANSACTION_ROLLBACK_TYPE 消息不支持延时投递
        final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
        if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
                || tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE) 
            // Delay Delivery
            if (msg.getDelayTimeLevel() > 0) 
                if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel()) 
                    msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
                
				// 存储消息时,延时消息进入SCHEDULE_TOPIC_XXXX的主题下
                topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
                // 消息队列编号 = 延迟级别 - 1
                queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());

                // Backup real topic, queueId
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
                MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
                msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));

                msg.setTopic(topic);
                msg.setQueueId(queueId);
            
        

        InetSocketAddress bornSocketAddress = (InetSocketAddress) msg.getBornHost();
        if (bornSocketAddress.getAddress() instanceof Inet6Address) 
            msg.setBornHostV6Flag();
        

        InetSocketAddress storeSocketAddress = (InetSocketAddress) msg.getStoreHost();
        if (storeSocketAddress.getAddress() instanceof Inet6Address) 
            msg.setStoreHostAddressV6Flag();
        

        PutMessageThreadLocal putMessageThreadLocal = this.putMessageThreadLocal.get();
        PutMessageResult encodeResult = putMessageThreadLocal.getEncoder().encode(msg);
        if (encodeResult != null) 
            return CompletableFuture.completedFuture(encodeResult);
        
        msg.setEncodedBuff(putMessageThreadLocal.getEncoder().encoderBuffer);
        PutMessageContext putMessageContext = new PutMessageContext(generateKey(putMessageThreadLocal.getKeyBuilder(), msg));

        long elapsedTimeInLock = 0;
        MappedFile unlockMappedFile = null;

		// 加锁,同一时刻只能有一个线程put消息
        putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
        try 
            MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
            long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
            this.beginTimeInLock = beginLockTimestamp;

            // Here settings are stored timestamp, in order to ensure an orderly
            // global
            msg.setStoreTimestamp(beginLockTimestamp);
			// 只有不存在映射文件或文件已存满,才进行创建
            if (null == mappedFile || mappedFile.isFull()) 
                mappedFile = this.mappedFileQueue.getLastMappedFile(0); 
            
            if (null == mappedFile) 
                log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                beginTimeInLock = 0;
                return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));
            
			// 追加消息至MappedFile的缓存中,更新写入位置wrotePosition
            result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
            switch (result.getStatus()) 
                case PUT_OK:
                    break;
                // 当文件剩余空间不足时,创建新的MappedFile并写入
                case END_OF_FILE: 
                    unlockMappedFile = mappedFile;
                    // Create a new file, re-write the message
                    mappedFile = this.mappedFileQueue.getLastMappedFile(0);
                    if (null == mappedFile) 
                        // XXX: warn and notify me
                        log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                        beginTimeInLock = 0;
                        return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result));
                    
                    result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
                    break;
                case MESSAGE_SIZE_EXCEEDED:
                case PROPERTIES_SIZE_EXCEEDED:
                    beginTimeInLock = 0;
                    return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));
                case UNKNOWN_ERROR:
                    beginTimeInLock = 0;
                    return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
                default:
                    beginTimeInLock = 0;
                    return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
            

            elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
            beginTimeInLock = 0;
         finally 
        	// 释放锁
            putMessageLock.unlock();
        

        /** log **/

        if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable()) 
            this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
        

        PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);

        // Statistics
        storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
        storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());

        CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);
        CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);

		// 异步刷盘流程
        return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) -> 
            if (flushStatus != PutMessageStatus.PUT_OK) 
                putMessageResult.setPutMessageStatus(flushStatus);
            
            if (replicaStatus != PutMessageStatus.PUT_OK) 
                putMessageResult.setPutMessageStatus(replicaStatus);
                if (replicaStatus == PutMessageStatus.FLUSH_SLAVE_TIMEOUT) 
                    log.error("do sync transfer other node, wait return, but failed, topic:  tags:  client address: ",
                            msg.getTopic(), msg.getTags(), msg.getBornHostNameString());
                
            
            return putMessageResult;
        );
    

消息在CommitLog文件中是顺序存储的

RocketMQ消息存储在CommitLog文件里,最终落盘,对应的类为MappedFile,它是从MappedFileQueue中获取的,如果对象不存在,就会创建:

MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();

// 只有不存在映射文件或文件已存满,才进行创建
if (null == mappedFile || mappedFile.isFull()) 
    mappedFile = this.mappedFileQueue.getLastMappedFile(0); 

创建(获取)完成对象之后就会把消息插入到mappedFile里,如果文件放不下了,则会重新创建一个mappedFile来对其进行写入,最后就是使用异步Future的方式把消息持久化到磁盘上。


MappedFileQueue

上面的源码里首先从MappedFileQueue映射队列尾部中获取MappedFile对象:

public MappedFile getLastMappedFile() 
        MappedFile mappedFileLast = null;

        while (!this.mappedFiles.isEmpty()) 
            try 
                mappedFileLast = this.mappedFiles.get(this.mappedFiles.size() - 1);
                break;
             catch (IndexOutOfBoundsException e) 
                //continue;
             catch (Exception e) 
                log.error("getLastMappedFile has exception.", e);
                break;
            
        

        return mappedFileLast;
    

MappedFile对象为空时,表示MappedFile对象不存在,此时就需要重新创建一个MappedFile对象,相应的方法在MappedFileQueue

public class MappedFileQueue 
    private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
    private static final InternalLogger LOG_ERROR = InternalLoggerFactory.getLogger(LoggerName.STORE_ERROR_LOGGER_NAME);
	
	// 批量删除文件上限
    private static final int DELETE_FILES_BATCH_MAX = 10;
	// 目录
    private final String storePath;
	// 每个映射文件的大小
    private final int mappedFileSize;
	// 映射文件数组
    private final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();
	// 分配MappedFile服务
    private final AllocateMappedFileService allocateMappedFileService;
	// 最后flush到的offset
    private long flushedWhere = 0;
    // 最后commit到的offset
    private long committedWhere = 0;
	// 最后保存的时间戳
	private volatile long storeTimestamp = 0;
	
	/**
	*	获取最后一个可写入的映射文件
	*	当最后一个文件已经满的时候,创建一个新的文件
	*/
	public MappedFile getLastMappedFile(final long startOffset, boolean needCreate) 
        long createOffset = -1;
        MappedFile mappedFileLast = getLastMappedFile();
		
		// 不存在映射文件
        if (mappedFileLast == null) 
        	// 计算从哪个offset开始
            createOffset = startOffset - (startOffset % this.mappedFileSize);
        
		
		// 最后一个文件已满
        if (mappedFileLast != null && mappedFileLast.isFull()) 
            createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;
        
		
		// 创建文件
        if (createOffset != -1 && needCreate) 
            String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
            String nextNextFilePath = this.storePath + File.separator
                + UtilAll.offset2FileName(createOffset + this.mappedFileSize);
            MappedFile mappedFile = null;

            if (this.allocateMappedFileService != null) 
                mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
                    nextNextFilePath, this.mappedFileSize);
             else 
                t

以上是关于RocketMQ Broker消息处理流程剩余源码解析的主要内容,如果未能解决你的问题,请参考以下文章

RocketMQ源码(11)—Broker asyncPutMessage处理消息以及存储的高性能设计一万字

RocketMQ源码(11)—Broker asyncPutMessage处理消息以及存储的高性能设计一万字

RocketMQ源码系列 broker启动流程源码解析

RocketMQ Broker对新消息的处理流程

RocketMQ Broker对新消息的处理流程

RocketMQ Broker对新消息的处理流程