RocketMQ Broker消息处理流程剩余源码解析
Posted 小王曾是少年
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ Broker消息处理流程剩余源码解析相关的知识,希望对你有一定的参考价值。
🍊 Java学习:Java从入门到精通总结
🍊 深入浅出RocketMQ设计思想:深入浅出RocketMQ设计思想
🍊 绝对不一样的职场干货:大厂最佳实践经验指南
📆 最近更新:2023年3月4日
🍊 个人简介:通信工程本硕 for NJU💪、Java程序员🌕。做过科研paper,发过专利,优秀的程序员不应该只是CRUD
🍊 点赞 👍 收藏 ⭐留言 📝 都是我最大的动力!
文章目录
DefaultMessageStore
接上文,SendMessageProcessor
对象接收到消息之后,会把消息变成存储对象DefaultStoreMessage
实例:
DefaultMessageStore
的默认存储消息的方法asyncPutMessage
如下:
@Override
public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner msg)
PutMessageStatus checkStoreStatus = this.checkStoreStatus();
if (checkStoreStatus != PutMessageStatus.PUT_OK)
return CompletableFuture.completedFuture(new PutMessageResult(checkStoreStatus, null));
PutMessageStatus msgCheckStatus = this.checkMessage(msg);
if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL)
return CompletableFuture.completedFuture(new PutMessageResult(msgCheckStatus, null));
long beginTime = this.getSystemClock().now();
CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg);
putResultFuture.thenAccept((result) ->
long elapsedTime = this.getSystemClock().now() - beginTime;
if (elapsedTime > 500)
log.warn("putMessage not in lock elapsed time(ms)=, bodyLength=", elapsedTime, msg.getBody().length);
this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);
if (null == result || !result.isOk())
this.storeStatsService.getPutMessageFailedTimes().incrementAndGet();
);
return putResultFuture;
整个存储流程的代码还是比较清晰的:
- 先判断消息是否投放成功 及 检查消息的格式
- 上述检查都没问题时就会把消息存储在
CommitLog
里
CompletableFuture<PutMessageResult> putResultFuturethis.commitLog.asyncPutMessage(msg);
CommitLog
Broker
接收到消息后,最终消息存储在Commitlog
对象中,调用的是CommitLog
的putMessage
方法
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg)
// Set the storage time
msg.setStoreTimestamp(System.currentTimeMillis());
// Set the message body BODY CRC (consider the most appropriate setting
// on the client)
msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
// Back to Results
AppendMessageResult result = null;
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
String topic = msg.getTopic();
int queueId = msg.getQueueId();
// 延时消息处理,事务为TRANSACTION_PREPARED_TYPE 和 TRANSACTION_ROLLBACK_TYPE 消息不支持延时投递
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE)
// Delay Delivery
if (msg.getDelayTimeLevel() > 0)
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel())
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
// 存储消息时,延时消息进入SCHEDULE_TOPIC_XXXX的主题下
topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
// 消息队列编号 = 延迟级别 - 1
queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// Backup real topic, queueId
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
msg.setTopic(topic);
msg.setQueueId(queueId);
InetSocketAddress bornSocketAddress = (InetSocketAddress) msg.getBornHost();
if (bornSocketAddress.getAddress() instanceof Inet6Address)
msg.setBornHostV6Flag();
InetSocketAddress storeSocketAddress = (InetSocketAddress) msg.getStoreHost();
if (storeSocketAddress.getAddress() instanceof Inet6Address)
msg.setStoreHostAddressV6Flag();
PutMessageThreadLocal putMessageThreadLocal = this.putMessageThreadLocal.get();
PutMessageResult encodeResult = putMessageThreadLocal.getEncoder().encode(msg);
if (encodeResult != null)
return CompletableFuture.completedFuture(encodeResult);
msg.setEncodedBuff(putMessageThreadLocal.getEncoder().encoderBuffer);
PutMessageContext putMessageContext = new PutMessageContext(generateKey(putMessageThreadLocal.getKeyBuilder(), msg));
long elapsedTimeInLock = 0;
MappedFile unlockMappedFile = null;
// 加锁,同一时刻只能有一个线程put消息
putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
try
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
this.beginTimeInLock = beginLockTimestamp;
// Here settings are stored timestamp, in order to ensure an orderly
// global
msg.setStoreTimestamp(beginLockTimestamp);
// 只有不存在映射文件或文件已存满,才进行创建
if (null == mappedFile || mappedFile.isFull())
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
if (null == mappedFile)
log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));
// 追加消息至MappedFile的缓存中,更新写入位置wrotePosition
result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
switch (result.getStatus())
case PUT_OK:
break;
// 当文件剩余空间不足时,创建新的MappedFile并写入
case END_OF_FILE:
unlockMappedFile = mappedFile;
// Create a new file, re-write the message
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
if (null == mappedFile)
// XXX: warn and notify me
log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result));
result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
break;
case MESSAGE_SIZE_EXCEEDED:
case PROPERTIES_SIZE_EXCEEDED:
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));
case UNKNOWN_ERROR:
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
default:
beginTimeInLock = 0;
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
beginTimeInLock = 0;
finally
// 释放锁
putMessageLock.unlock();
/** log **/
if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable())
this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
// Statistics
storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).incrementAndGet();
storeStatsService.getSinglePutMessageTopicSizeTotal(topic).addAndGet(result.getWroteBytes());
CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);
CompletableFuture<PutMessageStatus> replicaResultFuture = submitReplicaRequest(result, msg);
// 异步刷盘流程
return flushResultFuture.thenCombine(replicaResultFuture, (flushStatus, replicaStatus) ->
if (flushStatus != PutMessageStatus.PUT_OK)
putMessageResult.setPutMessageStatus(flushStatus);
if (replicaStatus != PutMessageStatus.PUT_OK)
putMessageResult.setPutMessageStatus(replicaStatus);
if (replicaStatus == PutMessageStatus.FLUSH_SLAVE_TIMEOUT)
log.error("do sync transfer other node, wait return, but failed, topic: tags: client address: ",
msg.getTopic(), msg.getTags(), msg.getBornHostNameString());
return putMessageResult;
);
消息在
CommitLog
文件中是顺序存储的
RocketMQ
消息存储在CommitLog
文件里,最终落盘,对应的类为MappedFile
,它是从MappedFileQueue
中获取的,如果对象不存在,就会创建:
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
// 只有不存在映射文件或文件已存满,才进行创建
if (null == mappedFile || mappedFile.isFull())
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
创建(获取)完成对象之后就会把消息插入到mappedFile
里,如果文件放不下了,则会重新创建一个mappedFile
来对其进行写入,最后就是使用异步Future
的方式把消息持久化到磁盘上。
MappedFileQueue
上面的源码里首先从MappedFileQueue
映射队列尾部中获取MappedFile
对象:
public MappedFile getLastMappedFile()
MappedFile mappedFileLast = null;
while (!this.mappedFiles.isEmpty())
try
mappedFileLast = this.mappedFiles.get(this.mappedFiles.size() - 1);
break;
catch (IndexOutOfBoundsException e)
//continue;
catch (Exception e)
log.error("getLastMappedFile has exception.", e);
break;
return mappedFileLast;
当MappedFile
对象为空时,表示MappedFile
对象不存在,此时就需要重新创建一个MappedFile
对象,相应的方法在MappedFileQueue
里
public class MappedFileQueue
private static final InternalLogger log = InternalLoggerFactory.getLogger(LoggerName.STORE_LOGGER_NAME);
private static final InternalLogger LOG_ERROR = InternalLoggerFactory.getLogger(LoggerName.STORE_ERROR_LOGGER_NAME);
// 批量删除文件上限
private static final int DELETE_FILES_BATCH_MAX = 10;
// 目录
private final String storePath;
// 每个映射文件的大小
private final int mappedFileSize;
// 映射文件数组
private final CopyOnWriteArrayList<MappedFile> mappedFiles = new CopyOnWriteArrayList<MappedFile>();
// 分配MappedFile服务
private final AllocateMappedFileService allocateMappedFileService;
// 最后flush到的offset
private long flushedWhere = 0;
// 最后commit到的offset
private long committedWhere = 0;
// 最后保存的时间戳
private volatile long storeTimestamp = 0;
/**
* 获取最后一个可写入的映射文件
* 当最后一个文件已经满的时候,创建一个新的文件
*/
public MappedFile getLastMappedFile(final long startOffset, boolean needCreate)
long createOffset = -1;
MappedFile mappedFileLast = getLastMappedFile();
// 不存在映射文件
if (mappedFileLast == null)
// 计算从哪个offset开始
createOffset = startOffset - (startOffset % this.mappedFileSize);
// 最后一个文件已满
if (mappedFileLast != null && mappedFileLast.isFull())
createOffset = mappedFileLast.getFileFromOffset() + this.mappedFileSize;
// 创建文件
if (createOffset != -1 && needCreate)
String nextFilePath = this.storePath + File.separator + UtilAll.offset2FileName(createOffset);
String nextNextFilePath = this.storePath + File.separator
+ UtilAll.offset2FileName(createOffset + this.mappedFileSize);
MappedFile mappedFile = null;
if (this.allocateMappedFileService != null)
mappedFile = this.allocateMappedFileService.putRequestAndReturnMappedFile(nextFilePath,
nextNextFilePath, this.mappedFileSize);
else
t以上是关于RocketMQ Broker消息处理流程剩余源码解析的主要内容,如果未能解决你的问题,请参考以下文章
RocketMQ源码(11)—Broker asyncPutMessage处理消息以及存储的高性能设计一万字