RocketMQ Broker消息处理流程及部分源码解析
Posted 小王曾是少年
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ Broker消息处理流程及部分源码解析相关的知识,希望对你有一定的参考价值。
🍊 Java学习:Java从入门到精通总结
🍊 深入浅出RocketMQ设计思想:深入浅出RocketMQ设计思想
🍊 绝对不一样的职场干货:大厂最佳实践经验指南
📆 最近更新:2023年2月10日
🍊 个人简介:通信工程本硕 for NJU💪、Java程序员🌕。做过科研paper,发过专利,优秀的程序员不应该只是CRUD
🍊 点赞 👍 收藏 ⭐留言 📝 都是我最大的动力!
文章目录
消息处理流程
SendMessageProcessor
处理类接收到消息DefaultMessageStore
实例将消息变成IndexFile
、ConsumeQueue
和CommitLog
对象- 上述对象转成内存映射对象后进行落盘
消息存储目录结构
RocketMQ
的文件存储在store
文件夹里,里面包含commitlog
,config
,consumerqueue
,index
文件夹和abort
,checkpoint
两个文件。
文件夹:
commitlog
存储写入到commitLog
的消息内容config
存储配置信息consumerqueue
存储消费者队列信息index
存储消息队列的索引文件
文件:
abort
标记RocketMQ
是否正常退出checkpoint
存储commitlog
,config
,consumerqueue
,index
文件的刷盘时间
├── abort
├── checkpoint
├── commitlog
│ ├── 00000000000000000000
│ ├── 00000000001073741824
├── config
│ ├── consumerFilter.json
│ ├── consumerOffset.json
│ ├── delayOffset.json
│ ├── subscriptionGroup.json
│ ├── topics.json
├── consumequeue
│ ├── TopicA
│ ├── TopicB
│ ├── TopicC
├── index
│ ├── 00000000000000000000
│ ├── 00000000001073741824
RocketMQ
内有专门对应磁盘上存储文件的封装类:
CommitLog
:对应commitlog
文件ConsumeQueue
:对应consumerqueue
文件IndexFile
:对应index
文件MappedFile
:直接内存映射业务的封装类,通过操作该类实例,可以把消息写入内存映射缓冲区,或将消息刷盘MappedFileQueue
:连续物理存储的封装类,可以通过offset
快速定位消息所在的MappedFile
MappedFileBuff
:堆外内存
SendMessage
源码
SendMessageProcessor
是接收消息的一个钩子函数,该类的对象将会处理发送到Broker
的消息
processRequest
主要流程已在代码片注释中给出:
public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor
private List<ConsumeMessageHook> consumeMessageHookList;
public SendMessageProcessor(final BrokerController brokerController)
super(brokerController);
public CompletableFuture<RemotingCommand> asyncProcessRequest(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException
final SendMessageContext mqtraceContext;
switch (request.getCode())
case RequestCode.CONSUMER_SEND_MSG_BACK:
return this.asyncConsumerSendMsgBack(ctx, request);
default:
// 解析请求体
SendMessageRequestHeader requestHeader = parseRequestHeader(request);
if (requestHeader == null)
return CompletableFuture.completedFuture(null);
// 建立消息上下文
mqtraceContext = buildMsgContext(ctx, requestHeader);
// 发送消息前的逻辑
this.executeSendMessageHookBefore(ctx, request, mqtraceContext);
if (requestHeader.isBatch())
return this.asyncSendBatchMessage(ctx, request, mqtraceContext, requestHeader);
else
return this.asyncSendMessage(ctx, request, mqtraceContext, requestHeader);
/**
* ......
**/
根据源码可以看出,首先解析发送消息的请求SendMessageRequestHeader
,然后调用asyncSend(Batch)Message
方法进行消息的发送。
该类提供了发送或接收消息的钩子函数:如果发送消息,则调用sendMessage
方法,如果是接收消息则调用pullMessage
拉取消息的方法。
sendMessage
消息发送给Broker
服务器时,调用的是sendMessage
方法接收并存储消息,主要流程已在代码片注释中给出:
private CompletableFuture<RemotingCommand> asyncSendMessage(ChannelHandlerContext ctx, RemotingCommand request,
SendMessageContext mqtraceContext,
SendMessageRequestHeader requestHeader)
// 初始化响应
final RemotingCommand response = preSend(ctx, request, requestHeader);
// 构建响应头
final SendMessageResponseHeader responseHeader = (SendMessageResponseHeader)response.readCustomHeader();
if (response.getCode() != -1)
return CompletableFuture.completedFuture(response);
final byte[] body = request.getBody();
int queueIdInt = requestHeader.getQueueId();
TopicConfig topicConfig = this.brokerController.getTopicConfigManager().selectTopicConfig(requestHeader.getTopic());
if (queueIdInt < 0)
queueIdInt = randomQueueId(topicConfig.getWriteQueueNums());
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(requestHeader.getTopic());
msgInner.setQueueId(queueIdInt);
if (!handleRetryAndDLQ(requestHeader, response, request, msgInner, topicConfig))
return CompletableFuture.completedFuture(response);
// 设置消息体数据
msgInner.setBody(body);
msgInner.setFlag(requestHeader.getFlag());
Map<String, String> origProps = MessageDecoder.string2messageProperties(requestHeader.getProperties());
MessageAccessor.setProperties(msgInner, origProps);
msgInner.setBornTimestamp(requestHeader.getBornTimestamp());
msgInner.setBornHost(ctx.channel().remoteAddress());
msgInner.setStoreHost(this.getStoreHost());
msgInner.setReconsumeTimes(requestHeader.getReconsumeTimes() == null ? 0 : requestHeader.getReconsumeTimes());
// 获取Broker集群名称
String clusterName = this.brokerController.getBrokerConfig().getBrokerClusterName();
MessageAccessor.putProperty(msgInner, MessageConst.PROPERTY_CLUSTER, clusterName);
// 同步等待消息存储成功
if (origProps.containsKey(MessageConst.PROPERTY_WAIT_STORE_MSG_OK))
String waitStoreMsgOKValue = origProps.remove(MessageConst.PROPERTY_WAIT_STORE_MSG_OK);
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
origProps.put(MessageConst.PROPERTY_WAIT_STORE_MSG_OK, waitStoreMsgOKValue);
else
// 异步
msgInner.setPropertiesString(MessageDecoder.messageProperties2String(msgInner.getProperties()));
CompletableFuture<PutMessageResult> putMessageResult = null;
String transFlag = origProps.get(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if (transFlag != null && Boolean.parseBoolean(transFlag))
// Broker拒绝接收消息
if (this.brokerController.getBrokerConfig().isRejectTransactionMessage())
response.setCode(ResponseCode.NO_PERMISSION);
response.setRemark(
"the broker[" + this.brokerController.getBrokerConfig().getBrokerIP1()
+ "] sending transaction message is forbidden");
return CompletableFuture.completedFuture(response);
putMessageResult = this.brokerController.getTransactionalMessageService().asyncPrepareMessage(msgInner);
else
putMessageResult = this.brokerController.getMessageStore().asyncPutMessage(msgInner);
return handlePutMessageResultFuture(putMessageResult, response, request, msgInner, responseHeader, mqtraceContext, ctx, queueIdInt);
以上是关于RocketMQ Broker消息处理流程及部分源码解析的主要内容,如果未能解决你的问题,请参考以下文章
RocketMQ源码(11)—Broker asyncPutMessage处理消息以及存储的高性能设计一万字