RocketMQ源码(11)—Broker asyncPutMessage处理消息以及存储的高性能设计一万字
Posted 刘Java
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ源码(11)—Broker asyncPutMessage处理消息以及存储的高性能设计一万字相关的知识,希望对你有一定的参考价值。
基于RocketMQ release-4.9.3,深入的介绍了Broker存储消息的源码,以及存储的高性能设计。
此前我们学习了RocketMQ源码(10)—Broker asyncSendMessage处理消息以及自动创建Topic的整体流程,从流程中我们知道asyncPutMessage方法真正的用来存储消息。现在我们来看看这个方法的源码。
文章目录
- 1 asyncPutMessage存储普通消息
- 2 CommitLog#asyncPutMessage异步存储消息
- 3 存储高性能设计总结
1 asyncPutMessage存储普通消息
普通消息的处理、存储入口方法是DefaultMessageStore#asyncPutMessage方法。
首先会调用checkStoreStatus、checkMessage、checkLmqMessage方法进行一系列的前置校验,如果通过了,则调用CommitLog#asyncPutMessage方法真正的存储消息,最后会更新耗费的时间或者失败次数。
/**
* DefaultMessageStore的方法
* <p>
* 处理、存储消息
*
* @param msg 需要存储的MessageInstance
*/
@Override
public CompletableFuture<PutMessageResult> asyncPutMessage(MessageExtBrokerInner msg)
/*
* 1 检查存储状态
*/
PutMessageStatus checkStoreStatus = this.checkStoreStatus();
//如果不是PUT_OK就直接返回了
if (checkStoreStatus != PutMessageStatus.PUT_OK)
return CompletableFuture.completedFuture(new PutMessageResult(checkStoreStatus, null));
/*
* 2 检查消息
*/
PutMessageStatus msgCheckStatus = this.checkMessage(msg);
if (msgCheckStatus == PutMessageStatus.MESSAGE_ILLEGAL)
return CompletableFuture.completedFuture(new PutMessageResult(msgCheckStatus, null));
/*
* 2 检查 light message queue(LMQ),即微消息队列
*/
PutMessageStatus lmqMsgCheckStatus = this.checkLmqMessage(msg);
if (msgCheckStatus == PutMessageStatus.LMQ_CONSUME_QUEUE_NUM_EXCEEDED)
return CompletableFuture.completedFuture(new PutMessageResult(lmqMsgCheckStatus, null));
//当前时间戳
long beginTime = this.getSystemClock().now();
/*
* 核心方法,调用CommitLog#asyncPutMessage方法存储消息
*/
CompletableFuture<PutMessageResult> putResultFuture = this.commitLog.asyncPutMessage(msg);
putResultFuture.thenAccept((result) ->
//存储消息消耗的时间
long elapsedTime = this.getSystemClock().now() - beginTime;
if (elapsedTime > 500)
log.warn("putMessage not in lock elapsed time(ms)=, bodyLength=", elapsedTime, msg.getBody().length);
//更新统计保存消息花费的时间和最大花费的时间
this.storeStatsService.setPutMessageEntireTimeMax(elapsedTime);
if (null == result || !result.isOk())
//如果存储失败在,则增加保存消息失败的次数
this.storeStatsService.getPutMessageFailedTimes().add(1);
);
return putResultFuture;
1.1 checkStoreStatus检查存储状态
首先就会检查消息存储状态,看是否支持写入消息:
- 如果DefaultMessageStore是shutdown状态,返回SERVICE_NOT_AVAILABLE。
- 如果broker是SLAVE角色,则返回SERVICE_NOT_AVAILABLE,不能将消息写入SLAVE角色。
- 如果不支持写入,那么返回SERVICE_NOT_AVAILABLE,可能因为broker的磁盘已满、写入逻辑队列错误、写入索引文件错误等等原因。
- 如果操作系统页缓存繁忙,则返回OS_PAGECACHE_BUSY,如果broker持有锁的时间超过osPageCacheBusyTimeOutMills,则算作操作系统页缓存繁忙。
- 返回PUT_OK,表示可以存储消息。
/**
* DefaultMessageStore的方法
* <p>
* 检查存储状态
*/
private PutMessageStatus checkStoreStatus()
//如果DefaultMessageStore是shutdown状态,返回SERVICE_NOT_AVAILABLE
if (this.shutdown)
log.warn("message store has shutdown, so putMessage is forbidden");
return PutMessageStatus.SERVICE_NOT_AVAILABLE;
//如果broker是SLAVE角色,则返回SERVICE_NOT_AVAILABLE,不能将消息写入SLAVE角色
if (BrokerRole.SLAVE == this.messageStoreConfig.getBrokerRole())
long value = this.printTimes.getAndIncrement();
if ((value % 50000) == 0)
log.warn("broke role is slave, so putMessage is forbidden");
return PutMessageStatus.SERVICE_NOT_AVAILABLE;
//如果不支持写入,那么返回SERVICE_NOT_AVAILABLE
//可能因为broker的磁盘已满、写入逻辑队列错误、写入索引文件错误等等原因
if (!this.runningFlags.isWriteable())
long value = this.printTimes.getAndIncrement();
if ((value % 50000) == 0)
log.warn("the message store is not writable. It may be caused by one of the following reasons: " +
"the broker's disk is full, write to logic queue error, write to index file error, etc");
return PutMessageStatus.SERVICE_NOT_AVAILABLE;
else
this.printTimes.set(0);
//如果操作系统页缓存繁忙,则返回OS_PAGECACHE_BUSY
//如果broker持有锁的时间超过osPageCacheBusyTimeOutMills,则算作操作系统页缓存繁忙
if (this.isOSPageCacheBusy())
return PutMessageStatus.OS_PAGECACHE_BUSY;
//返回PUT_OK,表示可以存储消息
return PutMessageStatus.PUT_OK;
1.2 checkMessage检查消息
检查消息,看是否符合要求:
- 如果topic长度大于127,则返回MESSAGE_ILLEGAL,表示topic过长了。
- 如果设置的属性长度大于32767,则返回MESSAGE_ILLEGAL,表示properties过长了。
- 否则,返回PUT_OK,表示检查通过。
/**
* DefaultMessageStore的方法
* <p>
* 检查消息
*/
private PutMessageStatus checkMessage(MessageExtBrokerInner msg)
//如果topic长度大于127,则返回MESSAGE_ILLEGAL,表示topic过长了
if (msg.getTopic().length() > Byte.MAX_VALUE)
log.warn("putMessage message topic length too long " + msg.getTopic().length());
return PutMessageStatus.MESSAGE_ILLEGAL;
//如果设置的属性长度大于32767,则返回MESSAGE_ILLEGAL,表示properties过长了
if (msg.getPropertiesString() != null && msg.getPropertiesString().length() > Short.MAX_VALUE)
log.warn("putMessage message properties length too long " + msg.getPropertiesString().length());
return PutMessageStatus.MESSAGE_ILLEGAL;
return PutMessageStatus.PUT_OK;
2 CommitLog#asyncPutMessage异步存储消息
该方法中将会对消息进行真正的保存,即持久化操作,步骤比较繁杂,但同样属于RocketMQ源码的精髓,值得一看。其大概步骤为:
- 处理延迟消息的逻辑。
- 如果是延迟消息,即DelayTimeLevel大于0,那么替换topic为SCHEDULE_TOPIC_XXXX,替换queueId为延迟队列id, id = level - 1,如果延迟级别大于最大级别,则设置为最大级别18,,默认延迟2h。这些参数可以在broker端配置类MessageStoreConfig中配置。
- 最后保存真实topic到消息的REAL_TOPIC属性,保存queueId到消息的REAL_QID属性,方便后面恢复。
- 消息编码。获取线程本地变量,其内部包含一个线程独立的encoder和keyBuilder对象。将消息内容编码,存储到encoder内部的encoderBuffer中,它是通过ByteBuffer.allocateDirect(size)得到的一个直接缓冲区。消息写入之后,会调用encoderBuffer.flip()方法,将Buffer从写模式切换到读模式,可以读取到数据。
- 加锁并写入消息。
- 一个broker将所有的消息都追加到同一个逻辑CommitLog日志文件中,因此需要通过获取putMessageLock锁来控制并发。有两种锁,一种是ReentrantLock可重入锁,另一种spin则是CAS锁。根据StoreConfig的useReentrantLockWhenPutMessage决定是否使用可重入锁,默认为true,使用可重入锁。
- 从mappedFileQueue中的mappedFiles集合中获取最后一个MappedFile。如果最新mappedFile为null,或者mappedFile满了,那么会新建mappedFile。
- 通过mappedFile调用appendMessage方法追加消息,这里仅仅是追加消息到byteBuffer的内存中。如果是writeBuffer则表示消息写入了堆外内存中,如果是mappedByteBuffer,则表示消息写入了page chache中。总之,都是存储在内存之中。
- 追加成功之后解锁。如果是剩余空间不足,则会重新初始化一个MappedFile并再次尝试追加。
- 如果存在写满的MappedFile并且启用了文件内存预热,那么这里调用unlockMappedFile对MappedFile执行解锁。
- 更新消息统计信息。随后调用submitFlushRequest方法提交刷盘请求,将会根据刷盘策略进行刷盘。随后调用submitReplicaRequest方法提交副本请求,用于主从同步。
/**
* CommitLog的方法
* <p>
* 异步存储消息
*
* @param msg
* @return
*/
public CompletableFuture<PutMessageResult> asyncPutMessage(final MessageExtBrokerInner msg)
// Set the storage time
//设置存储时间
msg.setStoreTimestamp(System.currentTimeMillis());
// Set the message body BODY CRC (consider the most appropriate setting
// on the client)
//设置消息正文CRC
msg.setBodyCRC(UtilAll.crc32(msg.getBody()));
// Back to Results
AppendMessageResult result = null;
StoreStatsService storeStatsService = this.defaultMessageStore.getStoreStatsService();
String topic = msg.getTopic();
// int queueId msg.getQueueId();
/*
* 1 处理延迟消息的逻辑
*
* 替换topic和queueId,保存真实topic和queueId
*/
//根据sysFlag获取事务状态,普通消息的sysFlag为0
final int tranType = MessageSysFlag.getTransactionValue(msg.getSysFlag());
//如果不是事务消息,或者commit提交事务小i
if (tranType == MessageSysFlag.TRANSACTION_NOT_TYPE
|| tranType == MessageSysFlag.TRANSACTION_COMMIT_TYPE)
// Delay Delivery
//获取延迟级别,判断是否是延迟消息
if (msg.getDelayTimeLevel() > 0)
//如果延迟级别大于最大级别,则设置为最大级别
if (msg.getDelayTimeLevel() > this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel())
msg.setDelayTimeLevel(this.defaultMessageStore.getScheduleMessageService().getMaxDelayLevel());
//获取延迟队列的topic,固定为 SCHEDULE_TOPIC_XXXX
topic = TopicValidator.RMQ_SYS_SCHEDULE_TOPIC;
//根据延迟等级获取对应的延迟队列id, id = level - 1
int queueId = ScheduleMessageService.delayLevel2QueueId(msg.getDelayTimeLevel());
// Backup real topic, queueId
//使用扩展属性REAL_TOPIC 记录真实topic
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_TOPIC, msg.getTopic());
//使用扩展属性REAL_QID 记录真实queueId
MessageAccessor.putProperty(msg, MessageConst.PROPERTY_REAL_QUEUE_ID, String.valueOf(msg.getQueueId()));
msg.setPropertiesString(MessageDecoder.messageProperties2String(msg.getProperties()));
//更改topic和queueId为延迟队列的topic和queueId
msg.setTopic(topic);
msg.setQueueId(queueId);
//发送消息的地址
InetSocketAddress bornSocketAddress = (InetSocketAddress) msg.getBornHost();
if (bornSocketAddress.getAddress() instanceof Inet6Address)
msg.setBornHostV6Flag();
//存储消息的地址
InetSocketAddress storeSocketAddress = (InetSocketAddress) msg.getStoreHost();
if (storeSocketAddress.getAddress() instanceof Inet6Address)
msg.setStoreHostAddressV6Flag();
/*
* 2 消息编码
*/
//获取线程本地变量,其内部包含一个线程独立的encoder和keyBuilder对象
PutMessageThreadLocal putMessageThreadLocal = this.putMessageThreadLocal.get();
//将消息内容编码,存储到encoder内部的encoderBuffer中,它是通过ByteBuffer.allocateDirect(size)得到的一个直接缓冲区
//消息写入之后,会调用encoderBuffer.flip()方法,将Buffer从写模式切换到读模式,可以读取到数据
PutMessageResult encodeResult = putMessageThreadLocal.getEncoder().encode(msg);
if (encodeResult != null)
return CompletableFuture.completedFuture(encodeResult);
//编码后的encoderBuffer暂时存入msg的encodedBuff中
msg.setEncodedBuff(putMessageThreadLocal.getEncoder().encoderBuffer);
//存储消息上下文
PutMessageContext putMessageContext = new PutMessageContext(generateKey(putMessageThreadLocal.getKeyBuilder(), msg));
/*
* 3 加锁并写入消息
* 一个broker将所有的消息都追加到同一个逻辑CommitLog日志文件中,因此需要通过获取putMessageLock锁来控制并发。
*/
//持有锁的时间
long elapsedTimeInLock = 0;
MappedFile unlockMappedFile = null;
/*
* 有两种锁,一种是ReentrantLock可重入锁,另一种spin则是CAS锁
* 根据StoreConfig的useReentrantLockWhenPutMessage决定是否使用可重入锁,默认为true,使用可重入锁。
*/
putMessageLock.lock(); //spin or ReentrantLock ,depending on store config
try
/*
* 从mappedFileQueue中的mappedFiles集合中获取最后一个MappedFile
*/
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
//加锁后的起始时间
long beginLockTimestamp = this.defaultMessageStore.getSystemClock().now();
this.beginTimeInLock = beginLockTimestamp;
// Here settings are stored timestamp, in order to ensure an orderly
// global
//设置存储的时间戳为加锁后的起始时间,保证有序
msg.setStoreTimestamp(beginLockTimestamp);
/*
* 如果最新mappedFile为null,或者mappedFile满了,那么会新建mappedFile并返回
*/
if (null == mappedFile || mappedFile.isFull())
mappedFile = this.mappedFileQueue.getLastMappedFile(0); // Mark: NewFile may be cause noise
if (null == mappedFile)
log.error("create mapped file1 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, null));
/*
* 追加存储消息
*/
result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
switch (result.getStatus())
case PUT_OK:
break;
case END_OF_FILE:
//文件剩余空间不足,那么初始化新的文件并尝试再次存储
unlockMappedFile = mappedFile;
// Create a new file, re-write the message
mappedFile = this.mappedFileQueue.getLastMappedFile(0);
if (null == mappedFile)
// XXX: warn and notify me
log.error("create mapped file2 error, topic: " + msg.getTopic() + " clientAddr: " + msg.getBornHostString());
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.CREATE_MAPEDFILE_FAILED, result));
result = mappedFile.appendMessage(msg, this.appendMessageCallback, putMessageContext);
break;
case MESSAGE_SIZE_EXCEEDED:
case PROPERTIES_SIZE_EXCEEDED:
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.MESSAGE_ILLEGAL, result));
case UNKNOWN_ERROR:
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
default:
return CompletableFuture.completedFuture(new PutMessageResult(PutMessageStatus.UNKNOWN_ERROR, result));
//加锁的持续时间
elapsedTimeInLock = this.defaultMessageStore.getSystemClock().now() - beginLockTimestamp;
finally
//重置开始时间,释放锁
beginTimeInLock = 0;
putMessageLock.unlock();
if (elapsedTimeInLock > 500)
log.warn("[NOTIFYME]putMessage in lock cost time(ms)=, bodyLength= AppendMessageResult=", elapsedTimeInLock, msg.getBody().length, result);
//如果存在写满的MappedFile并且启用了文件内存预热,那么这里对MappedFile执行解锁
if (null != unlockMappedFile && this.defaultMessageStore.getMessageStoreConfig().isWarmMapedFileEnable())
this.defaultMessageStore.unlockMappedFile(unlockMappedFile);
PutMessageResult putMessageResult = new PutMessageResult(PutMessageStatus.PUT_OK, result);
// Statistics
//存储数据的统计信息更新
storeStatsService.getSinglePutMessageTopicTimesTotal(msg.getTopic()).add(1);
storeStatsService.getSinglePutMessageTopicSizeTotal(topic).add(result.getWroteBytes());
/*
* 4 提交刷盘请求,将会根据刷盘策略进行刷盘
*/
CompletableFuture<PutMessageStatus> flushResultFuture = submitFlushRequest(result, msg);
RocketMQ源码系列 broker启动流程源码解析