RocketMQ源码系列 CommitLog 存取消息源码解析

Posted Dream_it_possible!

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ源码系列 CommitLog 存取消息源码解析相关的知识,希望对你有一定的参考价值。

目录

一、存储消息

1) 消息的存储流程图

2) 消息存储的入口

3) 消息存储的规则

二、读取消息

原理解析

核心方法


        前面讲到了rocketmq的消息存储设计原理,我们知道commitlog是存储消息的目录,下面讲解commitlog一些核心方法的用法。

一、存储消息

1) 消息的存储流程图

        需要存储的消息先会通过NIO管道写入到内存池里,然后由定时任务写入到MappedFile里。

        processReadEvent() 方法由HAservice线程启动而执行,如果缓冲区里有数据那么会执行dispatchReadRequest() 方法。

      private boolean processReadEvent() {
            int readSizeZeroTimes = 0;
            while (this.byteBufferRead.hasRemaining()) {
                try {
                // 如果缓冲区还有数据
                    int readSize = this.socketChannel.read(this.byteBufferRead);
                    if (readSize > 0) {
                        readSizeZeroTimes = 0;
                        boolean result = this.dispatchReadRequest();
                        if (!result) {
                            log.error("HAClient, dispatchReadRequest error");
                            return false;
                        }
                    } else if (readSize == 0) {
                        if (++readSizeZeroTimes >= 3) {
                            break;
                        }
                    } else {
                        log.info("HAClient, processReadEvent read socket < 0");
                        return false;
                    }
                } catch (IOException e) {
                    log.info("HAClient, processReadEvent read socket exception", e);
                    return false;
                }
            }

            return true;
        }

将缓冲区的数据按批写入到磁盘

   private boolean dispatchReadRequest() {
            final int msgHeaderSize = 8 + 4; // phyoffset + size
            int readSocketPos = this.byteBufferRead.position();

            while (true) {
                int diff = this.byteBufferRead.position() - this.dispatchPosition;
                if (diff >= msgHeaderSize) {
                    long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPosition);
                // 每次从缓冲区读取8个字节
                    int bodySize = this.byteBufferRead.getInt(this.dispatchPosition + 8);

                    long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();

                    if (slavePhyOffset != 0) {
                        if (slavePhyOffset != masterPhyOffset) {
                            log.error("master pushed offset not equal the max phy offset in slave, SLAVE: "
                                    + slavePhyOffset + " MASTER: " + masterPhyOffset);
                            return false;
                        }
                    }
                // 
                    if (diff >= (msgHeaderSize + bodySize)) {
                        byte[] bodyData = new byte[bodySize];
                        this.byteBufferRead.position(this.dispatchPosition + msgHeaderSize);         

 // 从缓冲区中读取指定长度的消息数据到bodyData
                        this.byteBufferRead.get(bodyData);
//  将bodyData里的数据写入到磁盘
 HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData);

                        this.byteBufferRead.position(readSocketPos);
                        this.dispatchPosition += msgHeaderSize + bodySize;

                        if (!reportSlaveMaxOffsetPlus()) {
                            return false;
                        }

                        continue;
                    }
                }

                if (!this.byteBufferRead.hasRemaining()) {
                    this.reallocateByteBuffer();
                }

                break;
            }

            return true;
        }

2) 消息存储的入口

org.apche.rocketmq.store.DefaultMessageStore是消息存储的入口类,DefaultMessageStore包含了4个存放消息的方法, 含同步和异步方式。

1) putMessage(MessageExtInnerBroker msg) ;  同步存放单个消息。

2) putMessages(MessageExtBatch messageExtBatch);  同步存放批量消息。

3) asyncPutMessage(MessageExtInnerBroker msg); 异步存放单个消息。

4) asyncPutMessage(MessageExtInnerBroker msg); 异步存放批量消息。

3) 消息存储的规则

        我们知道消息是由消息头和消息体组成的,如果消息体、properties过长rocketmq会怎么处理呢?

        其实在appendMessage时,会判断properties 和message的长度大小,如果不符合要求,即size过大,那么就会抛弃掉此条消息,然后在返回结果result对象的status里写入PROPERTIES_SIZE_EXCEEDED、MESSAGE_SIZE_EXCEEDED, 然后根据status抛出异常提示。


            result = mappedFile.appendMessage(msg, this.appendMessageCallback);
            switch (result.getStatus()) {
                case PUT_OK:
                    break;
                case END_OF_FILE:
                    unlockMappedFile = mappedFile;
                    // Create a new file, re-write the message
                    mappedFile = this.mappedFileQueue.getLastMappedFile(0);
                    if (null == mappedFile) {
                        // XXX: warn and notify me
                        log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
                        beginTimeInLock = 0;
                        return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result));
                    }
                    result = mappedFile.appendMessage(msg, this.appendMessageCallback);
                    break;
                case MESSAGE_SIZE_EXCEEDED:
                case PROPERTIES_SIZE_EXCEEDED:
                    beginTimeInLock = 0;
                    return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));
                case UNKNOWN_ERROR:
                    beginTimeInLock = 0;
                    return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
                default:
                    beginTimeInLock = 0;
                    return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
            }

        如果消息的长度满足要求,但是消息的长度正好超过当前mappedFile可存储的最大长度,那么rocketmq会怎么处理呢?

        每次appendMeesage会返回一个result, 在appendMessage时会判断消息是否满足当前mappedfile剩余可用文件,如果不满足,那么将END_OF_FILE写入到result的status里, 同时会在旧文件里写一个结束标识CommitLog.BLANK_MAGIC_CODE 。然后根据END_OF_FILE会重新创建一个mappedFile, 然后将消息写入到新的mappedFile里。

判断消息是否能放入到当前文件的剩余长度里, 如果不能返appendMessageStatus.END_OF_FILE

 if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
                this.resetByteBuffer(this.msgStoreItemMemory, maxBlank);
                // 1 TOTALSIZE
                this.msgStoreItemMemory.putInt(maxBlank);
                // 2 MAGICCODE
                this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
                // 3 The remaining space may be any value
                // Here the length of the specially set maxBlank
                final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
                byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);
                return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(),
                    queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
            }

二、读取消息

原理解析

       commitlog文件名为当前文件开始的全局偏移量,每个mssage按照pos+ size的方式记录, 在读的时候,我们只需要此消息message的全局offset就能找到是哪个commitlog, 找到哪个commitlog后,根据offset得到当前消息所在当前文件的pos, pos即为所在的位置,知道了位置,再根据长度来遍历指定长度个位置,就能拿到消息了。

核心方法

getMessage(final long offset, final int size)

1.  获取mappedFile文件的大小。

2.  根据全局偏移量定位到指定的MappedFIle。

3.  根据全局偏移量和文件大小计算出所在当前commitlog偏移量。

4.  根据当前偏移量和文件大小读取到消息。

  public SelectMappedBufferResult getMessage(final long offset, final int size) {
        // 获取到mappedFile文件大小
        int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();
        // 根据全局偏移量获取到指定的MappedFile
        MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, offset == 0);
        if (mappedFile != null) {
            // 根据全局偏移量计算出当前message所在的pos。
            int pos = (int) (offset % mappedFileSize);
            // 根据pos+size获取到message
            return mappedFile.selectMappedBuffer(pos, size);
        }
        return null;
    }

以上是关于RocketMQ源码系列 CommitLog 存取消息源码解析的主要内容,如果未能解决你的问题,请参考以下文章

RocketMQ源码系列 消息store存储设计核心原理解析

RocketMQ 源码分析 —— Message 存储

源码分析 RocketMQ DLedger 多副本存储实现

rocketmq消息文件(commitlog)删除策略分析

RocketMQ的Broker是如何持久化存储消息

RocketMQ系列(七):主从同步