RocketMQ ConsumeQueue(消费队列)

Posted 乐观男孩

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ ConsumeQueue(消费队列)相关的知识,希望对你有一定的参考价值。

目录

说明

RocketMQ对新消息是顺序存在MapperFile上,但不同的Consumer需要订阅不同topic的数据,针对不同的Consumer,如果从原始数据进行数据拉取,这样效率非常低,有可能扫描完所有的MapperFile后,只有几条消息可供消费的。RocketMQ为了解决这种问题,引入了ConsumeQueue消费队列的概念:针对每一条消息,都会按topic和queueId进行分类。Consumer进行拉取消息时,Broker先从ConsumerQueue获取到待消费消息在CommitLog上的offset,根据offset从CommitLog中获取完整的消息。
ReputMessageService,会不停地检测是否有新的消息还未保存到ConsumerQueue中,当发现某条消息还未保存时,会将保存的任务分发给CommitLogDispatcherBuildConsumeQueue,然后保存到ConsumeQueue中。所以流程图大概如下:

ConsumerQueue建立过程

1、ReputMessageService不断的从CommitLog中获取未添加到ConsumerQueue的消息

2、获取到未添加到ConsumerQueue的消息后,会进行分发

分发的对象是从dispatcherList获取的,dispatcherList在DefaultMessageStore实例化时已经进行了初始化。这里只需要关注CommitLogDispatcherBuildConsumeQueue。

3、CommitLogDispatcherBuildConsumeQueue根据topic和queueId获取到对应的ConsumerQueue

调用putMessagePositionInfoWrapper方法进行处理

调用putMessagePositionInfo进行处理
组装内容,刷新数据到磁盘文件

ConsumerQueue文件结构

总结

1、ConsumeQueue每条数据占20字节空间,包含三部分内容:消息的offset、消息大小size、tag的hashCode。单个ConsumeQueue文件最多保存30W条数据。
2、ConsumeQueue文件保存在$storePath/topic/queueId/目录下。

以上是关于RocketMQ ConsumeQueue(消费队列)的主要内容,如果未能解决你的问题,请参考以下文章

RocketMQ ConsumeQueue(消费队列)

【rocketmq客户端】订阅关系一致

rocketmq的broker接收消息的时候,如何更新consumeQueue和indexfile的

RocketMQ消息存储原理

RocketMQ事务机制的底层实现原理解析

mq消息存储