RocketMQ源码之 consumer是怎样消费消息的

Posted chuliang

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ源码之 consumer是怎样消费消息的相关的知识,希望对你有一定的参考价值。

DefaultMQPushConsumer(以push为例).start()调用mQClientFactory.start(),其中pullMessageService.start在一个循环中从pullRequestQueue中获取pullRequest,

执行pullMessage方法,调用的是remotingClient的invokeAsync方法,传入一个封装了PullCallback的回调函数,等响应到来的时候,PullCallback

的onSuccess方法processQueue.putMessage先把消息放进队列,再由consumeMessageService(实际是一个线程池,分为orderly和concurrently

两种消费模式,具体由自己的内部类ConsumeRequest分别实现)消费,最后调用executePullRequestImmediately把新的pullRequest放入

上面说的pullRequestQueue种,保证循环的去broker获取新的消息,可以看出push其实还是pull。

有个问题就是:pullRequestQueue中第一个pullRequest是什么时候放进去的?

上面说的pullMessageService.start的下面有this.rebalanceService.start(),run方法的循环中,countDownlatch等待,被唤醒后会doRebalance一直

到dispatchPullRequest再到熟悉的上面说的executePullRequestImmediately放入pullRequest。而mQClientFactory.start()方法的下面有

this.mQClientFactory.rebalanceImmediately(),里面即时放行countDownlatch。

doRebalance

以上是关于RocketMQ源码之 consumer是怎样消费消息的的主要内容,如果未能解决你的问题,请参考以下文章

RocketMQ之Consumer

深度挖掘 RocketMQ底层源码「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-上)

RocketMQ源码(15)—消费者DefaultMQPushConsumer启动主要流程源码

4RocketMQ 源码解析之 网络通信 Netty

4RocketMQ 源码解析之 网络通信 Netty

消息中间件之RocketMQ简介