RocketMQ源码系列 CommitLog 存取消息源码解析
Posted Dream_it_possible!
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ源码系列 CommitLog 存取消息源码解析相关的知识,希望对你有一定的参考价值。
目录
前面讲到了rocketmq的消息存储设计原理,我们知道commitlog是存储消息的目录,下面讲解commitlog一些核心方法的用法。
一、存储消息
1) 消息的存储流程图
需要存储的消息先会通过NIO管道写入到内存池里,然后由定时任务写入到MappedFile里。
processReadEvent() 方法由HAservice线程启动而执行,如果缓冲区里有数据那么会执行dispatchReadRequest() 方法。
private boolean processReadEvent() {
int readSizeZeroTimes = 0;
while (this.byteBufferRead.hasRemaining()) {
try {
// 如果缓冲区还有数据
int readSize = this.socketChannel.read(this.byteBufferRead);
if (readSize > 0) {
readSizeZeroTimes = 0;
boolean result = this.dispatchReadRequest();
if (!result) {
log.error("HAClient, dispatchReadRequest error");
return false;
}
} else if (readSize == 0) {
if (++readSizeZeroTimes >= 3) {
break;
}
} else {
log.info("HAClient, processReadEvent read socket < 0");
return false;
}
} catch (IOException e) {
log.info("HAClient, processReadEvent read socket exception", e);
return false;
}
}
return true;
}
将缓冲区的数据按批写入到磁盘
private boolean dispatchReadRequest() {
final int msgHeaderSize = 8 + 4; // phyoffset + size
int readSocketPos = this.byteBufferRead.position();while (true) {
int diff = this.byteBufferRead.position() - this.dispatchPosition;
if (diff >= msgHeaderSize) {
long masterPhyOffset = this.byteBufferRead.getLong(this.dispatchPosition);
// 每次从缓冲区读取8个字节
int bodySize = this.byteBufferRead.getInt(this.dispatchPosition + 8);long slavePhyOffset = HAService.this.defaultMessageStore.getMaxPhyOffset();
if (slavePhyOffset != 0) {
if (slavePhyOffset != masterPhyOffset) {
log.error("master pushed offset not equal the max phy offset in slave, SLAVE: "
+ slavePhyOffset + " MASTER: " + masterPhyOffset);
return false;
}
}
//
if (diff >= (msgHeaderSize + bodySize)) {
byte[] bodyData = new byte[bodySize];
this.byteBufferRead.position(this.dispatchPosition + msgHeaderSize);// 从缓冲区中读取指定长度的消息数据到bodyData
this.byteBufferRead.get(bodyData);
// 将bodyData里的数据写入到磁盘
HAService.this.defaultMessageStore.appendToCommitLog(masterPhyOffset, bodyData);this.byteBufferRead.position(readSocketPos);
this.dispatchPosition += msgHeaderSize + bodySize;if (!reportSlaveMaxOffsetPlus()) {
return false;
}continue;
}
}if (!this.byteBufferRead.hasRemaining()) {
this.reallocateByteBuffer();
}break;
}return true;
}
2) 消息存储的入口
org.apche.rocketmq.store.DefaultMessageStore是消息存储的入口类,DefaultMessageStore包含了4个存放消息的方法, 含同步和异步方式。
1) putMessage(MessageExtInnerBroker msg) ; 同步存放单个消息。
2) putMessages(MessageExtBatch messageExtBatch); 同步存放批量消息。
3) asyncPutMessage(MessageExtInnerBroker msg); 异步存放单个消息。
4) asyncPutMessage(MessageExtInnerBroker msg); 异步存放批量消息。
3) 消息存储的规则
我们知道消息是由消息头和消息体组成的,如果消息体、properties过长rocketmq会怎么处理呢?
其实在appendMessage时,会判断properties 和message的长度大小,如果不符合要求,即size过大,那么就会抛弃掉此条消息,然后在返回结果result对象的status里写入PROPERTIES_SIZE_EXCEEDED、MESSAGE_SIZE_EXCEEDED, 然后根据status抛出异常提示。
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
switch (result.getStatus()) {
case PUT_OK:
break;
case END_OF_FILE:
unlockMappedFile = mappedFile;
// Create a new file, re-write the message
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
if (null == mappedFile) {
// XXX: warn and notify me
log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result));
}
result = mappedFile.appendMessage(msg, this.appendMessageCallback);
break;
case MESSAGE_SIZE_EXCEEDED:
case PROPERTIES_SIZE_EXCEEDED:
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));
case UNKNOWN_ERROR:
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
default:
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
}
如果消息的长度满足要求,但是消息的长度正好超过当前mappedFile可存储的最大长度,那么rocketmq会怎么处理呢?
每次appendMeesage会返回一个result, 在appendMessage时会判断消息是否满足当前mappedfile剩余可用文件,如果不满足,那么将END_OF_FILE写入到result的status里, 同时会在旧文件里写一个结束标识CommitLog.BLANK_MAGIC_CODE 。然后根据END_OF_FILE会重新创建一个mappedFile, 然后将消息写入到新的mappedFile里。
判断消息是否能放入到当前文件的剩余长度里, 如果不能返appendMessageStatus.END_OF_FILE
if ((msgLen + END_FILE_MIN_BLANK_LENGTH) > maxBlank) {
this.resetByteBuffer(this.msgStoreItemMemory, maxBlank);
// 1 TOTALSIZE
this.msgStoreItemMemory.putInt(maxBlank);
// 2 MAGICCODE
this.msgStoreItemMemory.putInt(CommitLog.BLANK_MAGIC_CODE);
// 3 The remaining space may be any value
// Here the length of the specially set maxBlank
final long beginTimeMills = CommitLog.this.defaultMessageStore.now();
byteBuffer.put(this.msgStoreItemMemory.array(), 0, maxBlank);
return new AppendMessageResult(AppendMessageStatus.END_OF_FILE, wroteOffset, maxBlank, msgId, msgInner.getStoreTimestamp(),
queueOffset, CommitLog.this.defaultMessageStore.now() - beginTimeMills);
}
二、读取消息
原理解析
commitlog文件名为当前文件开始的全局偏移量,每个mssage按照pos+ size的方式记录, 在读的时候,我们只需要此消息message的全局offset就能找到是哪个commitlog, 找到哪个commitlog后,根据offset得到当前消息所在当前文件的pos, pos即为所在的位置,知道了位置,再根据长度来遍历指定长度个位置,就能拿到消息了。
核心方法
getMessage(final long offset, final int size)
1. 获取mappedFile文件的大小。
2. 根据全局偏移量定位到指定的MappedFIle。
3. 根据全局偏移量和文件大小计算出所在当前commitlog偏移量。
4. 根据当前偏移量和文件大小读取到消息。
public SelectMappedBufferResult getMessage(final long offset, final int size) {
// 获取到mappedFile文件大小
int mappedFileSize = this.defaultMessageStore.getMessageStoreConfig().getMappedFileSizeCommitLog();
// 根据全局偏移量获取到指定的MappedFile
MappedFile mappedFile = this.mappedFileQueue.findMappedFileByOffset(offset, offset == 0);
if (mappedFile != null) {
// 根据全局偏移量计算出当前message所在的pos。
int pos = (int) (offset % mappedFileSize);
// 根据pos+size获取到message
return mappedFile.selectMappedBuffer(pos, size);
}
return null;
}
以上是关于RocketMQ源码系列 CommitLog 存取消息源码解析的主要内容,如果未能解决你的问题,请参考以下文章