RocketMQ源码解析-Store篇
Posted _微风轻起
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ源码解析-Store篇相关的知识,希望对你有一定的参考价值。
这一篇我们主要来梳理下`RocketMQ`消息的存储,这一块的逻辑主要是在`rocketmq-store`模块
我们对于这个模块的逻辑梳理主要是借助这些测试类来debug
分析主要是MappedFileQueue
、MappedFile
、CommitLog
、MessageStore
、ConsumeQueue
、IndexFile
这些类。我们主要是梳理这些类关于获取、存储消息的主要逻辑,梳理其的大致脉络。
一、MappedFile
这个类主要对应操作的是我们的消息最终会写到的文件。
1、初始化构建
public class MappedFileTest
private final String storeMessage = "Once, there was a chance for me!";
@Test
public void testSelectMappedBuffer() throws IOException
MappedFile mappedFile = new MappedFile("target/unit_test_store/MappedFileTest/000", 1024 * 64);
boolean result = mappedFile.appendMessage(storeMessage.getBytes());
assertThat(result).isTrue();
SelectMappedBufferResult selectMappedBufferResult = mappedFile.selectMappedBuffer(0);
byte[] data = new byte[storeMessage.length()];
selectMappedBufferResult.getByteBuffer().get(data);
String readString = new String(data);
assertThat(readString).isEqualTo(storeMessage);
........
我们看到这个测试用例主要是创建一个大小为1024 * 64
的文件000
:
private MappedByteBuffer mappedByteBuffer;
private void init(final String fileName, final int fileSize) throws IOException
this.fileName = fileName;
this.fileSize = fileSize;
this.file = new File(fileName);
this.fileFromOffset = Long.parseLong(this.file.getName());
boolean ok = false;
ensureDirOK(this.file.getParent());
try
this.fileChannel = new RandomAccessFile(this.file, "rw").getChannel();
this.mappedByteBuffer = this.fileChannel.map(MapMode.READ_WRITE, 0, fileSize);
TOTAL_MAPPED_VIRTUAL_MEMORY.addAndGet(fileSize);
TOTAL_MAPPED_FILES.incrementAndGet();
ok = true;
..........
同时会将文件最终映射到mappedByteBuffer
,然后是通过mappedByteBuffer
来处理这个文件的写入、读取(这些是java中nio
的一些类,先不深入),我们的消息就会写在这个文件。
2、消息的写入
这里消息的写入有几种,例如 这些写入byte[]
,或者入参是MessageExt
,或者批量这些。
1)、appendMessage(final byte[] data)
public boolean appendMessage(final byte[] data)
int currentPos = this.wrotePosition.get();
if ((currentPos + data.length) <= this.fileSize)
try
this.fileChannel.position(currentPos);
this.fileChannel.write(ByteBuffer.wrap(data));
catch (Throwable e)
log.error("Error occurred when append message to mappedFile.", e);
this.wrotePosition.addAndGet(data.length);
return true;
return false;
如果是以byte[]
方式的写入,由于其本身就是数组,处理是很简单的,直接通过fileChannel.write(ByteBuffer.wrap(data))
写入,然后通过this.wrotePosition.addAndGet(data.length)
来累加记录当前写入的多长的内容。
2)、MessageExt
这个类就是发送的消息的信息,同时这些信息会写到文件中
public class MessageExt extends Message
private static final long serialVersionUID = 5720810158625748049L;
private int queueId;
private int storeSize;
private long queueOffset;
private int sysFlag;
private long bornTimestamp;
private SocketAddress bornHost;
private long storeTimestamp;
private SocketAddress storeHost;
private String msgId;
private long commitLogOffset;
private int bodyCRC;
private int reconsumeTimes;
private long preparedTransactionOffset;
public MessageExt()
public class Message implements Serializable
private static final long serialVersionUID = 8445773977080406428L;
private String topic;
private int flag;
private Map<String, String> properties;
private byte[] body;
private String transactionId;
3)、appendMessagesInner(final MessageExt messageExt,…)
public AppendMessageResult appendMessagesInner(final MessageExt messageExt, final AppendMessageCallback cb)
assert messageExt != null;
assert cb != null;
int currentPos = this.wrotePosition.get();
if (currentPos < this.fileSize)
ByteBuffer byteBuffer = writeBuffer != null ? writeBuffer.slice() : this.mappedByteBuffer.slice();
byteBuffer.position(currentPos);
AppendMessageResult result = null;
if (messageExt instanceof MessageExtBrokerInner)
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBrokerInner) messageExt);
else if (messageExt instanceof MessageExtBatch)
result = cb.doAppend(this.getFileFromOffset(), byteBuffer, this.fileSize - currentPos, (MessageExtBatch) messageExt);
else
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
this.wrotePosition.addAndGet(result.getWroteBytes());
this.storeTimestamp = result.getStoreTimestamp();
return result;
log.error("MappedFile.appendMessage return null, wrotePosition: fileSize: ", currentPos, this.fileSize);
return new AppendMessageResult(AppendMessageStatus.UNKNOWN_ERROR);
这里首先是通过currentPos < this.fileSize
判断当前的文件有没有写满,然后就是获取文件对应的ByteBuffer
来处理后序的写入
public class MessageExtBrokerInner extends MessageExt
如果不是批量写入:
public AppendMessageResult doAppend(final long fileFromOffset, final ByteBuffer byteBuffer, final int maxBlank,
final MessageExtBrokerInner msgInner)
// STORETIMESTAMP + STOREHOSTADDRESS + OFFSET <br>
// PHY OFFSET
long wroteOffset = fileFromOffset + byteBuffer.position();
this.resetByteBuffer(hostHolder, 8);
String msgId = MessageDecoder.createMessageId(this.msgIdMemory, msgInner.getStoreHostBytes(hostHolder), wroteOffset);
// Record ConsumeQueue information
keyBuilder.setLength(0);
keyBuilder.append(msgInner.getTopic());
keyBuilder.append('-');
keyBuilder.append(msgInner.getQueueId());
String key = keyBuilder.toString();
//private HashMap<String/* topic-queueid */, Long/* offset */> topicQueueTable
//这里首先是通过topic+queueId来从topicQueueTable中获取该条消息对应的队列ID已经写到哪里了,也就是该条消息从文件的哪里开始写入
Long queueOffset = CommitLog.this.topicQueueTable.get(key);
if (null == queueOffset)
queueOffset = 0L;
CommitLog.this.topicQueueTable.put(key, queueOffset);
............
final byte[] topicData = msgInner.getTopic().getBytes(MessageDecoder.CHARSET_UTF8);
final int topicLength = topicData.length;
final int bodyLength = msgInner.getBody() == null ? 0 : msgInner.getBody().length;
final int msgLen = calMsgLength(bodyLength, topicLength, propertiesLength);
// Exceeds the maximum message
//判断该条消息体是否太大了,如果已经超过限制就不写入
if (msgLen > this.maxMessageSize)
return new AppendMessageResult(AppendMessageStatus.MESSAGE_SIZE_EXCEEDED);
// Determines whether there is sufficient free space
//判断当前文件是否已经写满了,是的话就返回`END_OF_FILE`
if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank)
..........
return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(),
queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
//private final ByteBuffer msgStoreItemMemory
//下面的逻辑就是消息的信息的具体写入了
// Initialization of storage space
this.resetByteBuffer(msgStoreItemMemory, msgLen);
// 1 TOTALSIZE
this.msgStoreItemMemory.putInt(msgLen);
// 2 MAGICCODE
this.msgStoreItemMemory.putInt(CommitLog.MESSAGE_MAGIC_CODE);
// 3 BODYCRC
this.msgStoreItemMemory.putInt(msgInner.getBodyCRC());
// 4 QUEUEID
this.msgStoreItemMemory.putInt(msgInner.getQueueId());
// 5 FLAG
this.msgStoreItemMemory.putInt(msgInner.getFlag());
// 6 QUEUEOFFSET
this.msgStoreItemMemory.putLong(queueOffset);
// 7 PHYSICALOFFSET
this.msgStoreItemMemory.putLong(fileFromOffset + byteBuffer.position());
// 8 SYSFLAG
this.msgStoreItemMemory.putInt(msgInner.getSysFlag());
// 9 BORNTIMESTAMP
this.msgStoreItemMemory.putLong(msgInner.getBornTimestamp());
// 10 BORNHOST
this.resetByteBuffer(hostHolder, 8);
this.msgStoreItemMemory.put(msgInner.getBornHostBytes(hostHolder));
// 11 STORETIMESTAMP
this.msgStoreItemMemory.putLong(msgInner.getStoreTimestamp());
// 12 STOREHOSTADDRESS
this.resetByteBuffer(hostHolder, 8);
this.msgStoreItemMemory.put(msgInner.getStoreHostBytes(hostHolder));
//this.msgBatchMemory.put(msgInner.getStoreHostBytes());
// 13 RECONSUMETIMES
this.msgStoreItemMemory.putInt(msgInner.getReconsumeTimes());
// 14 Prepared Transaction Offset
this.msgStoreItemMemory.putLong(msgInner.getPreparedTransactionOffset());
// 15 BODY
this.msgStoreItemMemory.putInt(bodyLength);
if (bodyLength > 0)
this.msgStoreItemMemory.put(msgInner.getBody());
// 16 TOPIC
this.msgStoreItemMemory.put((byte) topicLength);
this.msgStoreItemMemory.put(topicData);
// 17 PROPERTIES
this.msgStoreItemMemory.putShort((short) propertiesLength);
if (propertiesLength > 0)
this.msgStoreItemMemory.put(propertiesData);
final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
// Write messages to the queue buffer
// 这里就是将消息从msgStoreItemMemory中写入到byteBuffer中,也就是我们的记录文件,这个byteBuffer是前面的入参
byteBuffer.put(this.msgStoreItemMemory
msgStoreItemMemory中写入到.array(), 0, msgLen);
AppendMessageResult result = new AppendMessageResult(AppendMessageStatus.PUT_OK, wroteOffset, msgLen, msgId,
.......
return result;
自此我们就完成了一条消息的写入。
二、MappedFileQueue
这个类主要是用来管理MappedFile
的,我们知道前面创建MappedFile
是有设置其的文件大小,如果到了就需要新创建MappedFile
,同时这些MappedFile
从逻辑意义上来说是连续的,也就是position
是一直增加的,加入第一个文件放入了0-1023
的内容,则第二个文件是继续从1024-2047
,一直连续,我们以其的demo来说明
public class MappedFileQueue
.........
private final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();
@Test
public void testGetLastMappedFile()
final String fixedMsg = "0123456789abcdef";
MappedFileQueue mappedFileQueue =
new MappedFileQueue("target/unit_test_store/a/", 1024, null);
for (int i = 0; i < 1024; i++)
MappedFile mappedFile = mappedFileQueue.getLastMappedFile(0);
assertThat(mappedFile).isNotNull();
assertThat(mappedFile.appendMessage(fixedMsg.getBytes())).isTrue();
mappedFileQueue.shutdown(1000);
mappedFileQueue.destroy();
例如这个就是创建1024
大小的文件,然后循环写入内容,所以肯定是会创建多个文件的。
例如这个文件就是从00000000000000000000
、00000000000000001024
这样命名的。
然后获取的时候就通过便宜量计算其的index
,然后通过this.mappedFiles.get(index)
从List
中获取到对应的MappedFile
public MappedFile findMappedFileByOffset(final long offset, final boolean returnFirstOnNotFound)
try
MappedFile firstMappedFile = this.getFirstMappedFile();
MappedFile lastMappedFile = this.getLastMappedFile();
if (firstMappedFile != null && lastMappedFile != null)
if (offset < firstMappedFile.getFileFromOffset() || offset >= lastMappedFile.getFileFromOffset() + this.mappedFileSize)
........
else
int index = (int) ((offset / this.mappedFileSize) - (firstMappedFile.getFileFromOffset以上是关于RocketMQ源码解析-Store篇的主要内容,如果未能解决你的问题,请参考以下文章