RocketMQ ConsumeQueue(消费队列)
Posted 乐观男孩
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ ConsumeQueue(消费队列)相关的知识,希望对你有一定的参考价值。
目录
说明
RocketMQ对新消息是顺序存在MapperFile上,但不同的Consumer需要订阅不同topic的数据,针对不同的Consumer,如果从原始数据进行数据拉取,这样效率非常低,有可能扫描完所有的MapperFile后,只有几条消息可供消费的。RocketMQ为了解决这种问题,引入了ConsumeQueue消费队列的概念:针对每一条消息,都会按topic和queueId进行分类。Consumer进行拉取消息时,Broker先从ConsumerQueue获取到待消费消息在CommitLog上的offset,根据offset从CommitLog中获取完整的消息。
ReputMessageService,会不停地检测是否有新的消息还未保存到ConsumerQueue中,当发现某条消息还未保存时,会将保存的任务分发给CommitLogDispatcherBuildConsumeQueue,然后保存到ConsumeQueue中。所以流程图大概如下:
ConsumerQueue建立过程
1、ReputMessageService不断的从CommitLog中获取未添加到ConsumerQueue的消息
2、获取到未添加到ConsumerQueue的消息后,会进行分发
分发的对象是从dispatcherList获取的,dispatcherList在DefaultMessageStore实例化时已经进行了初始化。这里只需要关注CommitLogDispatcherBuildConsumeQueue。
3、CommitLogDispatcherBuildConsumeQueue根据topic和queueId获取到对应的ConsumerQueue
调用putMessagePositionInfoWrapper方法进行处理
调用putMessagePositionInfo进行处理
组装内容,刷新数据到磁盘文件
ConsumerQueue文件结构
总结
1、ConsumeQueue每条数据占20字节空间,包含三部分内容:消息的offset、消息大小size、tag的hashCode。单个ConsumeQueue文件最多保存30W条数据。
2、ConsumeQueue文件保存在$storePath/topic/queueId/目录下。
以上是关于RocketMQ ConsumeQueue(消费队列)的主要内容,如果未能解决你的问题,请参考以下文章