Rocketmq消息持久化
Posted tjc123
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Rocketmq消息持久化相关的知识,希望对你有一定的参考价值。
本文编写,参考:https://my.oschina.net/bieber/blog/725646
producer Send()的Message最终将由broker处理,处理类为:SendMessageProcessor ,处理方法:processRequet.
public class SendMessageProcessor extends AbstractSendMessageProcessor implements NettyRequestProcessor {
private List<ConsumeMessageHook> consumeMessageHookList;
public SendMessageProcessor(final BrokerController brokerController) {
super(brokerController);
}
@Override
public RemotingCommand processRequest(ChannelHandlerContext ctx, RemotingCommand request) throws RemotingCommandException {}
上述方法,并不是直接处理消息,而是交由MessageStore处理,相关代码如下:
MessageExtBrokerInner msgInner = new MessageExtBrokerInner();
msgInner.setTopic(requestHeader.getTopic());
msgInner.setQueueId(queueIdInt);
//......
PutMessageResult putMessageResult = this.brokerController.getMessageStore().putMessage(msgInner);
然而MessageStore也不直接持久化消息,转交给 CommitLog
long beginTime = this.getSystemClock().now();
PutMessageResult result = this.commitLog.putMessages(messageExtBatch);
从MappedFileQueue中取出最新的一条:
MappedFile mappedFile = this.mappedFileQueue.getLastMappedFile();
//写消息
result = mappedFile.appendMessages(messageExtBatch, this.appendMessageCallback);
//持久化到磁盘
handleDiskFlush(result, putMessageResult, messageExtBatch);
handleHA(result, putMessageResult, messageExtBatch);
cousumer 从broker读消息。
以上是关于Rocketmq消息持久化的主要内容,如果未能解决你的问题,请参考以下文章
SpringCloud系列十一:SpringCloudStream(SpringCloudStream 简介创建消息生产者创建消息消费者自定义消息通道分组与持久化设置 RoutingKey)(代码片段