RocketMQ源码解析-Store篇

Posted _微风轻起

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ源码解析-Store篇相关的知识,希望对你有一定的参考价值。

这一篇我们主要来梳理下`RocketMQ`消息的存储,这一块的逻辑主要是在`rocketmq-store`模块

​ 我们对于这个模块的逻辑梳理主要是借助这些测试类来debug分析主要是MappedFileQueueMappedFileCommitLogMessageStoreConsumeQueueIndexFile这些类。我们主要是梳理这些类关于获取、存储消息的主要逻辑,梳理其的大致脉络。

一、MappedFile

​ 这个类主要对应操作的是我们的消息最终会写到的文件。

1、初始化构建

public class MappedFileTest 
    private final String storeMessage = "Once, there was a chance for me!";

    @Test
    public void testSelectMappedBuffer() throws IOException 
        MappedFile mappedFile = new MappedFile("target/unit_test_store/MappedFileTest/000", 1024 * 64);
        boolean result = mappedFile.appendMessage(storeMessage.getBytes());
        assertThat(result).isTrue();

        SelectMappedBufferResult selectMappedBufferResult = mappedFile.selectMappedBuffer(0);
        byte[] data = new byte[storeMessage.length()];
        selectMappedBufferResult.getByteBuffer().get(data);
        String readString = new String(data);

        assertThat(readString).isEqualTo(storeMessage);
        ........
    

​ 我们看到这个测试用例主要是创建一个大小为1024 * 64的文件000:

private MappedByteBuffer mappedByteBuffer;

private void init(final String fileName, final int fileSize) throws IOException 
    this.fileName = fileName;
    this.fileSize = fileSize;
    this.file = new File(fileName);
    this.fileFromOffset = Long.parseLong(this.file.getName());
    boolean ok = false;

    ensureDirOK(this.file.getParent());

    try 
        this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
        this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
        TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);
        TOTAL_MAPPED_FILES.incrementAndGet();
        ok = true;
    		..........

​ 同时会将文件最终映射到mappedByteBuffer,然后是通过mappedByteBuffer来处理这个文件的写入、读取(这些是java中nio的一些类,先不深入),我们的消息就会写在这个文件。

2、消息的写入

这里消息的写入有几种,例如 这些写入byte[],或者入参是MessageExt,或者批量这些。

1)、appendMessage(final byte[] data)

public boolean appendMessage(final byte[] data) 
    int currentPos = this.wrotePosition.get();

    if ((currentPos + data.length) <= this.fileSize) 
        try 
            this.fileChannel.position(currentPos);
            this.fileChannel.write(ByteBuffer.wrap(data));
         catch (Throwable e) 
            log.error("Error occurred when append message to mappedFile.", e);
        
        this.wrotePosition.addAndGet(data.length);
        return true;
    

    return false;

​ 如果是以byte[]方式的写入,由于其本身就是数组,处理是很简单的,直接通过fileChannel.write(ByteBuffer.wrap(data))写入,然后通过this.wrotePosition.addAndGet(data.length)来累加记录当前写入的多长的内容。

2)、MessageExt

​ 这个类就是发送的消息的信息,同时这些信息会写到文件中

public class MessageExt extends Message 
    private static final long serialVersionUID = 5720810158625748049L;

    private int queueId;

    private int storeSize;

    private long queueOffset;
    private int sysFlag;
    private long bornTimestamp;
    private SocketAddress bornHost;

    private long storeTimestamp;
    private SocketAddress storeHost;
    private String msgId;
    private long commitLogOffset;
    private int bodyCRC;
    private int reconsumeTimes;

    private long preparedTransactionOffset;

    public MessageExt() 
    
public class Message implements Serializable 
    private static final long serialVersionUID = 8445773977080406428L;

    private String topic;
    private int flag;
    private Map<String, String> properties;
    private byte[] body;
    private String transactionId;

3)、appendMessagesInner(final MessageExt messageExt,…)

public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb) 
    assert messageExt != null;
    assert cb != null;

    int currentPos = this.wrotePosition.get();

    if (currentPos < this.fileSize) 
        ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
        byteBuffer.position(currentPos);
        AppendMessageResult result = null;
        if (messageExt instanceof MessageExtBrokerInner) 
            result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
         else if (messageExt instanceof MessageExtBatch) 
            result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
         else 
            return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
        
        this.wrotePosition.addAndGet(result.getWroteBytes());
        this.storeTimestamp = result.getStoreTimestamp();
        return result;
    
    log.error("MappedFile.appendMessage return null, wrotePosition:  fileSize: ", currentPos, this.fileSize);
    return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);

这里首先是通过currentPos < this.fileSize判断当前的文件有没有写满,然后就是获取文件对应的ByteBuffer来处理后序的写入

public class MessageExtBrokerInner extends MessageExt 

如果不是批量写入:

public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
    final MessageExtBrokerInner msgInner) 
    // STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br>

    // PHY OFFSET
    long wroteOffset = fileFromOffset + byteBuffer.position();

    this.resetByteBuffer(hostHolder, 8);
    String msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(hostHolder), wroteOffset);

    // Record ConsumeQueue information
    keyBuilder.setLength(0);
    keyBuilder.append(msgInner.getTopic());
    keyBuilder.append('-');
    keyBuilder.append(msgInner.getQueueId());
    String key = keyBuilder.toString();
    //private HashMap<String/* topic-queueid */, Long/* offset */> topicQueueTable
    //这里首先是通过topic+queueId来从topicQueueTable中获取该条消息对应的队列ID已经写到哪里了,也就是该条消息从文件的哪里开始写入
    Long queueOffset = CommitLog.this.topicQueueTable.get(key);
    if (null == queueOffset) 
        queueOffset = 0L;
        CommitLog.this.topicQueueTable.put(key, queueOffset);
    
		............
    final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
    final int topicLength = topicData.length;

    final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;

    final int msgLen = calMsgLength(bodyLength, topicLength, propertiesLength);

    // Exceeds the maximum message
    //判断该条消息体是否太大了,如果已经超过限制就不写入
    if (msgLen > this.maxMessageSize) 
        return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);
    
    // Determines whether there is sufficient free space
    //判断当前文件是否已经写满了,是的话就返回`END_OF_FILE`
    if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) 
        	..........
        return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(),
            queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
    

    //private final ByteBuffer msgStoreItemMemory
    //下面的逻辑就是消息的信息的具体写入了
    // Initialization of storage space
    this.resetByteBuffer(msgStoreItemMemory, msgLen);
    // 1 TOTALSIZE
    this.msgStoreItemMemory.putInt(msgLen);
    // 2 MAGICCODE
    this.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE);
    // 3 BODYCRC
    this.msgStoreItemMemory.putInt(msgInner.getBodyCRC());
    // 4 QUEUEID
    this.msgStoreItemMemory.putInt(msgInner.getQueueId());
    // 5 FLAG
    this.msgStoreItemMemory.putInt(msgInner.getFlag());
    // 6 QUEUEOFFSET
    this.msgStoreItemMemory.putLong(queueOffset);
    // 7 PHYSICALOFFSET
    this.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position());
    // 8 SYSFLAG
    this.msgStoreItemMemory.putInt(msgInner.getSysFlag());
    // 9 BORNTIMESTAMP
    this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp());
    // 10 BORNHOST
    this.resetByteBuffer(hostHolder, 8);
    this.msgStoreItemMemory.put(msgInner.getBornHostBytes(hostHolder));
    // 11 STORETIMESTAMP
    this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());
    // 12 STOREHOSTADDRESS
    this.resetByteBuffer(hostHolder, 8);
    this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(hostHolder));
    //this.msgBatchMemory.put(msgInner.getStoreHostBytes());
    // 13 RECONSUMETIMES
    this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());
    // 14 Prepared Transaction Offset
    this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());
    // 15 BODY
    this.msgStoreItemMemory.putInt(bodyLength);
    if (bodyLength > 0)
        this.msgStoreItemMemory.put(msgInner.getBody());
    // 16 TOPIC
    this.msgStoreItemMemory.put((byte) topicLength);
    this.msgStoreItemMemory.put(topicData);
    // 17 PROPERTIES
    this.msgStoreItemMemory.putShort((short) propertiesLength);
    if (propertiesLength > 0)
        this.msgStoreItemMemory.put(propertiesData);

    final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
    // Write messages to the queue buffer
    // 这里就是将消息从msgStoreItemMemory中写入到byteBuffer中,也就是我们的记录文件,这个byteBuffer是前面的入参  
    byteBuffer.put(this.msgStoreItemMemory
msgStoreItemMemory中写入到.array(), 0, msgLen);

    AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId,
 		.......
    return result;

​ 自此我们就完成了一条消息的写入。

二、MappedFileQueue

​ 这个类主要是用来管理MappedFile的,我们知道前面创建MappedFile是有设置其的文件大小,如果到了就需要新创建MappedFile,同时这些MappedFile从逻辑意义上来说是连续的,也就是position是一直增加的,加入第一个文件放入了0-1023的内容,则第二个文件是继续从1024-2047,一直连续,我们以其的demo来说明

public class MappedFileQueue 
		.........
    private final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();
@Test
public void testGetLastMappedFile() 
    final String fixedMsg = "0123456789abcdef";

    MappedFileQueue mappedFileQueue =
        new MappedFileQueue("target/unit_test_store/a/", 1024, null);

    for (int i = 0; i < 1024; i++) 
        MappedFile mappedFile = mappedFileQueue.getLastMappedFile(0);
        assertThat(mappedFile).isNotNull();
        assertThat(mappedFile.appendMessage(fixedMsg.getBytes())).isTrue();
    

    mappedFileQueue.shutdown(1000);
    mappedFileQueue.destroy();

​ 例如这个就是创建1024大小的文件,然后循环写入内容,所以肯定是会创建多个文件的。

例如这个文件就是从0000000000000000000000000000000000001024这样命名的。

然后获取的时候就通过便宜量计算其的index,然后通过this.mappedFiles.get(index)List中获取到对应的MappedFile

public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound) 
    try 
        MappedFile firstMappedFile = this.getFirstMappedFile();
        MappedFile lastMappedFile = this.getLastMappedFile();
        if (firstMappedFile != null && lastMappedFile != null) 
            if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize) 
                ........
             else 
                int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset() / this.以上是关于RocketMQ源码解析-Store篇的主要内容,如果未能解决你的问题,请参考以下文章

RocketMQ源码系列 消息store存储设计核心原理解析

RocketMQ源码解析-NameServer篇

RocketMQ源码解析-NameServer篇

RocketMQ源码解析-NameServer篇

RocketMQ源码—RocketMQ源码调试环境准备

6RocketMQ 源码解析之 Broker 启动(上)