深度挖掘RocketMQ底层源码「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行调度的流程(Pull模式)
Posted 洛神灬殇
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了深度挖掘RocketMQ底层源码「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行调度的流程(Pull模式)相关的知识,希望对你有一定的参考价值。
承接上文
承接上一章节的内容,下面我们看继续看拉取的调度模式,PULL与PUSH模式相比,PULL模式需要应用层不间断地进行拉取消息然后再执行消费处理,提高了应用层的编码复杂度,为了Pull方式的编程复杂度,RocketMQ提供了调度消费服务(MQPullConsumerScheduleService),在topic的订阅发送变化(初次订阅或距上次拉取消息超时)就触发PULL方式拉取消息。
MQPullConsumerScheduleService
MQPullConsumerScheduleService是PULL模式下面的调度服务,当RebalanceImpl.processQueueTable队列有变化时才进行消息的拉取,从而降低Pull方式的编程复杂度。在应用层按照如下方式使用:
使用MQPullConsumerScheduleService开发消费消息
实例化对象MQPullConsumerScheduleService
final MQPullConsumerScheduleService scheduleService = new MQPullConsumerScheduleService("GroupName1");
设置NameServer
scheduleService.getDefaultMQPullConsumer().setNamesrvAddr("127.0.0.1:9876");
设置消费组为集群模式
scheduleService.setMessageModel(MessageModel.CLUSTERING);
注册拉取回调函数
scheduleService.registerPullTaskCallback("TopicTest", new PullTaskCallback()
@Override
public void doPullTask(MessageQueue mq, PullTaskContext context)
MQPullConsumer consumer = context.getPullConsumer();
try
long offset = consumer.fetchConsumeOffset(mq, false);
if (offset < 0)
offset = 0;
PullResult pullResult = consumer.pull(mq, "*", offset, 32);
System.out.printf("%s%n", offset + "\\t" + mq + "\\t" + pullResult);
switch (pullResult.getPullStatus())
case FOUND:
break;
case NO_MATCHED_MSG:
break;
case NO_NEW_MSG:
case OFFSET_ILLEGAL:
break;
default:
break;
consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());
context.setPullNextDelayTimeMillis(100);
catch (Exception e)
e.printStackTrace();
);
从上下文中获取MQPullConsumer对象,此处其实就是DefaultMQPullConsumer。
MQPullConsumer consumer = context.getPullConsumer();
获取该消费组的该队列的消费进度
long offset = consumer.fetchConsumeOffset(mq, false);
拉取消息,pull()方法在DefaultMQPullConsumer有具体介绍
PullResult pullResult = consumer.pull(mq, "*", offset, 32);
更新消费组该队列消费进度
consumer.updateConsumeOffset(mq, pullResult.getNextBeginOffset());
设置下次拉取消息时间间隔,单位毫秒
context.setPullNextDelayTimeMillis(100);
启动调度组件,调用MQPullConsumerScheduleService.start()方法启动该调度服务。
scheduleService.start();
- 首先初始化队列监听器MessageQueueListenerImpl类,该类是MQPullConsumerScheduleService的内部类,实现了MessageQueueListener接口的messageQueueChanged方法;
- 将该监听器类赋值给DefaultMQPullConsumer.messageQueueListener变量值;
- 调用DefaultMQPullConsumer的start方法启动Consumer;
分析核心执行方法及流程
-
使用registerPullTaskCallback对Topic进行注册
-
MQPullConsumerScheduleService 会将Topic的每个队列以及相应的 doPullTask() 实现放入名为 taskTable 的Hash表中。
-
线程池 scheduledThreadPoolExecutor 会不断的调用每个队列的 doPullTask() 函数。
-
在 doPullTask() 完成自己的拉取消息逻辑,和DefaultMQPullConsumer是一样的。
-
用户设置下次调用间隔时间
-
scheduledThreadPoolExecutor 等待该间隔时间后,再次调用 doPullTask() 方法。
注册拉取任务回调函数
/**
* @param topic topic名称
* @param callback 回调函数
*/
public void registerPullTaskCallback(final String topic, final PullTaskCallback callback)
this.callbackTable.put(topic, callback);
this.defaultMQPullConsumer.registerMessageQueueListener(topic, null);
PullTaskCallback回调函数接口
调用MQPullConsumerScheduleService.registerPullTaskCallback (String topic, PullTaskCallback callback)方法,在该方法中以topic为key值将自定义的PullTaskCallback 对象存入MQPullConsumerScheduleService. callbackTable:ConcurrentHashMap<String ,PullTaskCallback>变量中;
public interface PullTaskCallback
/**
*
* @param mq 消息队列
* @param context 任务上下文
*/
void doPullTask(final MessageQueue mq, final PullTaskContext context);
建立PullTaskCallback接口的实现类,实现该接口的doPullTask(final MessageQueue mq, final PullTaskContext context)方法。
在该方法中可以先调用DefaultMQPullConsumer.fetchConsumeOffset (MessageQueue mq, boolean fromStore)方法获取MessageQueue队列的消费进度。
PullTaskContext拉取任务上下文
调用DefaultMQPullConsumer.pull(MessageQueue mq, String subExpression, long offset, int maxNums)方法,
- 指定的队列和指定的开始位置读取消息内容;
- 获取到的消息进行相关的业务逻辑处理;
public class PullTaskContext
private int pullNextDelayTimeMillis = 200;
// 使用该接口进行消息拉取,默认实现是DefaultMQPullConsumer
private MQPullConsumer pullConsumer;
public int getPullNextDelayTimeMillis()
return pullNextDelayTimeMillis;
/**
* 设置下次调用doPullTask()的间隔时间,默认毫秒
*/
public void setPullNextDelayTimeMillis(int pullNextDelayTimeMillis)
this.pullNextDelayTimeMillis = pullNextDelayTimeMillis;
public MQPullConsumer getPullConsumer()
return pullConsumer;
public void setPullConsumer(MQPullConsumer pullConsumer)
this.pullConsumer = pullConsumer;
- 调用DefaultMQPullConsumer.updateConsumeOffset(MessageQueue mq, long offset)方法进行消费进度的更新,其中offset值是在获取消息内容时返回的下一个消费进度值;
MQPullConsumerScheduleService的实现原理
触发拉取消息
RebalanceImpl.rebalanceByTopic()方法执行的过程中,若RebalanceImpl.processQueueTable有变化,则回调DefaultMQPullConsumer. messageQueueListener变量值的MessageQueueListenerImpl. MessageQueueChanged方法,在该方法中调用MQPullConsumerScheduleService. putTask(String topic, Set mqNewSet)方法。
-
若为广播模式(BROADCASTING),则mqNewSet为该topic下面的所有MessageQueue队列;
-
若为集群模式,则mqNewSet为给该topic分配的MessageQueue队列,putTask方法的大致逻辑如下:
-
遍历
MQPullConsumerScheduleService.taskTable: ConcurrentHashMap<MessageQueue, PullTaskImpl>
列表(表示正在拉取消息的任务列表),检查该topic下面的所有MessageQueue对象,若该对象不在入参mqNewSet集合中的,将对应的PullTaskImpl对象的cancelled变量标记为true。 -
mqNewSet集合中的MessageQueue对象,若不在MQPullConsumerScheduleService.taskTable列表中,则以MessageQueue对象为参数初始化PullTaskImpl对象,然后放入taskTable列表中,将该PullTaskImpl对象放入
MQPullConsumerScheduleService.scheduledThreadPoolExecutor
线程池中,然后立即执行该线程。
-
拉取消息的线程(PullTaskImpl)
该PullTaskImpl线程的run方法如下:
-
检查cancelled变量是为true,若为false则直接退出该线程;否则继续下面的处理;
-
以MessageQueue对象的topic值从MQPullConsumerScheduleService.callbackTable变量中获取PullTaskCallback的实现类(该类是由应用层实现);
3, 调用该PullTaskCallback实现类的doPullTask方法,即实现业务层定义的业务逻辑(通用逻辑是先获取消息内容,然后进行相应的业务处理,最后更新消费进度);
4, 再次检查cancelled变量是为true,若不为true,则将该PullTaskImpl对象再次放入MQPullConsumerScheduleService. scheduledThreadPoolExecutor线程池中,设定在200毫秒之后重新调度执行PullTaskImpl线程类;
以上是关于深度挖掘RocketMQ底层源码「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行调度的流程(Pull模式)的主要内容,如果未能解决你的问题,请参考以下文章
深度挖掘RocketMQ底层源码「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行核心的流程(上篇)
深度挖掘RocketMQ底层源码「底层问题分析系列」深度挖掘RocketMQ底层那些导致消息丢失的汇总盘点透析([REJECTREQUEST]
深度挖掘RocketMQ底层源码「底层问题分析系列」深度挖掘RocketMQ底层那些导致消息丢失的汇总盘点透析([REJECTREQUEST]
深度挖掘RocketMQ底层源码「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行调度的流程(Pull模式)
深度挖掘 RocketMQ底层源码「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-上)
精华推荐 | 深入浅出 RocketMQ原理及实战「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行核心的流程(上篇)