深度挖掘RocketMQ底层源码「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行核心的流程(上篇)

Posted 洛神灬殇

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了深度挖掘RocketMQ底层源码「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行核心的流程(上篇)相关的知识,希望对你有一定的参考价值。

精华推荐 | 【深入浅出 RocketMQ原理及实战】「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行核心的流程

  • 上篇:分析对应总体消费流程的判断和校验以及限流控制和回调等处理流程分析

  • 下篇:分析基于上篇的总体流程的底层的消息通讯以及拉去处理数据传输流程分析

RocketMQ的消息模型

RocketMQ的基础消息模型是发布-订阅(Pub/Sub)是一种消息范式,消息的发送者(称为发布者、生产者、Producer)会将消息直接发送给特定的接收者(称为订阅者、消费者、Comsumer),如下图所示。

消息通过生产者发送到某一个Topic,如果需要订阅该Topic并消费里面的消息的话,就要创建对应的消费者进行消费,而本文主要会进行介绍对应的消息队列的消费者。

本文主旨

本文主要会针对于RocketMQ的消费者Consumer的功能原理进行分析和介绍,消费者主要会通过以推(push),拉(pull)两种模式对消息进行消费。同时也支持集群方式和广播方式的消费。提供实时消息订阅机制,可以满足大多数用户的需求。

RocketMQ提供Push模式也提供了Pull模式

MQ的消费模式可以大致分为两种,一种是推Push,一种是拉Pull。

Push模式处理消费消费

Push是服务端主动推送消息给客户端,优点是及时性较好,但如果客户端没有做好流控,一旦服务端推送大量消息到客户端时,就会导致客户端消息堆积甚至崩溃。

DefaultMQPushConsumer的使用和初始化

Push模式主要通过初始化DefaultMQPushConsumer对象进行消费数据信息,案例代码如下所示。

DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumer_group");
// 设置NameServer地址 
consumer.setNamesrvAddr("localhost:9876");
//订阅一个或多个topic,并指定tag过滤条件,这里指定*表示接收所有tag的消息
consumer.subscribe("TopicTest", "*");
//注册回调接口来处理从Broker中收到的消息
consumer.registerMessageListener(new MessageListenerConcurrently() 
 @Override
 public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) 
        // 返回消息消费状态,ConsumeConcurrentlyStatus.CONSUME_SUCCESS为消费成功
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
  
);
// 启动Consumer
consumer.start();
消费的位点配置

消费端的消费的位点计算值,可以在启动前进行配置,主要方法可以通过下面代码进行配置。

consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
  • CONSUME_FROM_LAST_OFFSET:第一次启动从队列最后位置消费,后续再启动接着上次消费的进度开始消费
  • CONSUME_FROM_FIRST_OFFSET:第一次启动从队列初始位置消费,后续再启动接着上次消费的进度开始消费
  • CONSUME_FROM_TIMESTAMP:第一次启动从指定时间点位置消费,后续再启动接着上次消费的进度开始消费。

注意:第一次启动是指从来没有消费过消息的消费者,如果该消费者消费过,那么会在broker端记录该消费者的消费位置,如果该消费者挂了再启动,那么自动从上次消费的进度开始

消费的模式的配置

消费模式主要分为:集群消费(Clustering)和广播消费(Broadcasting)这两种。

  • 集群消费模式下,相同Consumer Group的每个Consumer实例平均分摊消息。
  • 广播消费模式下,相同Consumer Group的每个Consumer实例都接收全量的消息。
consumer.setMessageModel(MessageModel.BROADCASTING);
  • CLUSTERING:默认模式,同一个ConsumerGroup,每个consumer只消费所订阅消息的一部分内容,同一个ConsumerGroup里所有的Consumer消息加起来才是所订阅topic整体,从而达到负载均衡的目的。
  • BROADCASTING:同一个ConsumerGroup每个consumer都消费到所订阅topic所有消息,也就是一个消费会被多次分发,被多个consumer消费。
DefaultMQPushConsumer的运行原理和流程

DefaultMQPushConsumerImpl中各个对象的主要功能如下:

平衡和分配队列组件实现类-RebalancePushImpl

RebalancePushImpl:主要负责进行分配对应当前服务实例的消费者会从当前消费的topic中的那个Queue中进行消费消息;此外当消费者宕机或者下线的时候,还会执行rebalance再次平衡和分配给其他消费者对应的队列控制。

长连接进行拉去消息组件实现类-PullAPIWrapper

PullAPIWrapper:主要与broker服务端建立长连接,一直进行定时从broker服务端处拉取消息数据,默认为:32条消息,之后还会调用ConsumeMessageService实现类,进行用户注册的Listener执行消息消费逻辑。

看一下consumer.registerMessageListener的源码,如下所示。

/**
 * Register a callback to execute on message arrival for concurrent consuming.
 * @param messageListener message handling callback.
 */
 @Override
 public void registerMessageListener(MessageListenerConcurrently messageListener) 
    this.messageListener = messageListener;
    this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);
 
回调用户的注册的MessageListener组件实现类-ConsumeMessageService

ConsumeMessageService:实现所谓的"Push-被动"消费机制;从Broker拉取的消息后,封装成ConsumeRequest提交给ConsumeMessageSerivce,此service负责回调用户的Listener消费消息。

存储Offset的消费记录的位移组件实现类–OffsetStore

OffsetStore:维护当前consumer的消费记录(offset);有两种实现,Local和Rmote,Local存储在本地磁盘上,适用于BROADCASTING广播消费模式;而Remote则将消费进度存储在Broker上,适用于CLUSTERING集群消费模式;

综合门面功能接口供各个Service组件实现类–MQClientFactory

MQClientFactory:负责管理client(consumer、producer),并提供多中功能接口供各个Service(Rebalance、PullMessage等)调用;大部分逻辑均在这个类中完成,总体流程架构如下图所示。

DefaultMQPushConsumerImpl的start方法的源码

我们先来看一下对应的DefaultMQPushConsumerImpl类的start方法源码,源码可以看出主要实现过程在consumer.start后调用DefaultMQPushConsumerImpl的同步start方法,如下所示。

 public synchronized void start() throws MQClientException 
        switch (this.serviceState) 
            case CREATE_JUST:
                log.info("the consumer [] start beginning. messageModel=, isUnitMode=", this.defaultMQPushConsumer.getConsumerGroup(),
                    this.defaultMQPushConsumer.getMessageModel(), this.defaultMQPushConsumer.isUnitMode());
                this.serviceState = ServiceState.START_FAILED;
                this.checkConfig();
                this.copySubscription();
                if (this.defaultMQPushConsumer.getMessageModel() == MessageModel.CLUSTERING) 
                    this.defaultMQPushConsumer.changeInstanceNameToPID();
                
                this.mQClientFactory = MQClientManager.getInstance().getAndCreateMQClientInstance(this.defaultMQPushConsumer, this.rpcHook);
                this.rebalanceImpl.setConsumerGroup(this.defaultMQPushConsumer.getConsumerGroup());
                this.rebalanceImpl.setMessageModel(this.defaultMQPushConsumer.getMessageModel());
                this.rebalanceImpl.setAllocateMessageQueueStrategy(this.defaultMQPushConsumer.getAllocateMessageQueueStrategy());
                this.rebalanceImpl.setmQClientFactory(this.mQClientFactory);
                this.pullAPIWrapper = new PullAPIWrapper(
                    mQClientFactory,
                    this.defaultMQPushConsumer.getConsumerGroup(), isUnitMode());
                this.pullAPIWrapper.registerFilterMessageHook(filterMessageHookList);
                if (this.defaultMQPushConsumer.getOffsetStore() != null) 
                    this.offsetStore = this.defaultMQPushConsumer.getOffsetStore();
                 else 
                    switch (this.defaultMQPushConsumer.getMessageModel()) 
                        case BROADCASTING:
                            this.offsetStore = new LocalFileOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                            break;
                        case CLUSTERING:
                            this.offsetStore = new RemoteBrokerOffsetStore(this.mQClientFactory, this.defaultMQPushConsumer.getConsumerGroup());
                            break;
                        default:
                            break;
                    
                    this.defaultMQPushConsumer.setOffsetStore(this.offsetStore);
                
                this.offsetStore.load();
                if (this.getMessageListenerInner() instanceof MessageListenerOrderly) 
                    this.consumeOrderly = true;
                    this.consumeMessageService =
                        new ConsumeMessageOrderlyService(this, (MessageListenerOrderly) this.getMessageListenerInner());
                 else if (this.getMessageListenerInner() instanceof MessageListenerConcurrently) 
                    this.consumeOrderly = false;
                    this.consumeMessageService =
                        new ConsumeMessageConcurrentlyService(this, (MessageListenerConcurrently) this.getMessageListenerInner());
                
                this.consumeMessageService.start();
                boolean registerOK = mQClientFactory.registerConsumer(this.defaultMQPushConsumer.getConsumerGroup(), this);
                if (!registerOK) 
                    this.serviceState = ServiceState.CREATE_JUST;
                    this.consumeMessageService.shutdown();
                    throw new MQClientException("The consumer group[" + this.defaultMQPushConsumer.getConsumerGroup()
                        + "] has been created before, specify another name please." + FAQUrl.suggestTodo(FAQUrl.GROUP_NAME_DUPLICATE_URL),
                        null);
                
                mQClientFactory.start();
                log.info("the consumer [] start OK.", this.defaultMQPushConsumer.getConsumerGroup());
                this.serviceState = ServiceState.RUNNING;
                break;
            case RUNNING:
            case START_FAILED:
            case SHUTDOWN_ALREADY:
                throw new MQClientException("The PushConsumer service state not OK, maybe started once, "
                    + this.serviceState
                    + FAQUrl.suggestTodo(FAQUrl.CLIENT_SERVICE_NOT_OK),
                    null);
            default:
                break;
        
        this.updateTopicSubscribeInfoWhenSubscriptionChanged();
        this.mQClientFactory.checkClientInBroker();
        this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();
        this.mQClientFactory.rebalanceImmediately();

DefaultMQPushConsumerImpl的start方法的源码

主要我们能关注重点的代码和组件的start,通过mQClientFactory.start();发我们发现他调用了很多组件的start方法:

  • this.mQClientAPIImpl.start():主要用于开启请求-响应的网络通道对象。
  • this.startScheduledTask():主要开启多个定时任务的功能
  • this.pullMessageService.start():主要开启拉取数据的业务组件
  • this.rebalanceService.start():主要开启rebalance业务服务组件。
  • this.defaultMQProducer.getDefaultMQProducerImpl().start(false):开启push服务的对象组件作为门面。
public void start() throws MQClientException 
        synchronized (this) 
            switch (this.serviceState) 
                case CREATE_JUST:
                    this.serviceState = ServiceState.START_FAILED;
                    // If not specified,looking address from name server
                    if (null == this.clientConfig.getNamesrvAddr()) 
                        this.mQClientAPIImpl.fetchNameServerAddr();
                    
                    // Start request-response channel
                    this.mQClientAPIImpl.start();
                    // Start various schedule tasks
                    this.startScheduledTask();
                    // Start pull service
                    this.pullMessageService.start();
                    // Start rebalance service
                    this.rebalanceService.start();
                    // Start push service
                    this.defaultMQProducer.getDefaultMQProducerImpl().start(false);
                    log.info("the client factory [] start OK", this.clientId);
                    this.serviceState = ServiceState.RUNNING;
                    break;
                case RUNNING:
                    break;
                case SHUTDOWN_ALREADY:
                    break;
                case START_FAILED:
                    throw new MQClientException("The Factory object[" + this.getClientId() + "] has been created before, and failed.", null);
                default:
                    break;
            
        
    

重点我们先来主要看pullMessageService.start(),通过这里我们发现RocketMQ的Push模式底层其实也是通过pull实现的,接下来我们先来分析一下pullMessageService中的pullMessage方法的源码。

private void pullMessage(final PullRequest pullRequest) 
        final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());
        if (consumer != null) 
            DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
            impl.pullMessage(pullRequest);
         else 
            log.warn("No matched consumer for the PullRequest , drop it", pullRequest);
        

DefaultMQPushConsumerImpl的pullMessage方法的源码

源码中需要进行根据消费组进行筛选对应的消费组,以方便选对应的消费组件DefaultMQPushConsumerImpl,如下图所示。

final MQConsumerInner consumer = this.mQClientFactory.selectConsumer(pullRequest.getConsumerGroup());

最后还是通过DefaultMQPushConsumerImpl类的pullMessage方法来进行消息的逻辑处理,

DefaultMQPushConsumerImpl impl = (DefaultMQPushConsumerImpl) consumer;
impl.pullMessage(pullRequest);
DefaultMQPushConsumerImpl的逻辑限流控制流程

总结一下针对于限流的总体流程控制:

  1. 首先拉去消息数据的时候会先去判断对应的ProcessQueue的对象元素是否还存在订阅关系或者被删除了,从而进行跳过那些不应该被消费的数据。
final ProcessQueue processQueue = pullRequest.getProcessQueue();
if (processQueue.isDropped()) 
     log.info("the pull request[] is dropped.", pullRequest.toString());
     return;

上面的逻辑是先会判断和校验PullRequest对象中的ProcessQueue对象的dropped是否为true(在RebalanceService线程中为topic下的MessageQueue创建拉取消息请求时要维护对应的ProcessQueue对象,若Consumer不再订阅该topic则会将该对象的dropped置为true);若是则认为该请求是已经取消的,则直接跳出该方法。


  1. 更新PullRequest对象中的ProcessQueue对象的时间戳(ProcessQueue.lastPullTimestamp)为当前时间戳。此外会判断当前的Consumer消费者组件是否运行中,主要是通过DefaultMQPushConsumerImpl.serviceState是否为RUNNING。
pullRequest.getProcessQueue().setLastPullTimestamp(System.currentTimeMillis());
try 
    this.makeSureStateOK();
 catch (MQClientException e) 
    log.warn("pullMessage exception, consumer state not ok", e);
    this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_EXCEPTION);
    return;

if (this.isPause()) 
  log.warn("consumer was paused, execute pull request later. instanceName=, group=", this.defaultMQPushConsumer.getInstanceName(), this.defaultMQPushConsumer.getConsumerGroup());
  this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_SUSPEND);
  return;

如果运行状态或者是暂停状态this.isPause()=false(DefaultMQPushConsumerImpl.pause=true),则会进行执行PullMessageService.executePullRequestLater(PullRequest pullRequest, long timeDelay)方法延迟再拉取消息,其中timeDelay=3000;

该方法的目的是在3秒之后再次将该PullRequest对象放入PullMessageService. pullRequestQueue队列中;并跳出该方法


  1. 主要进行消费者端进行速度和控制消费速度的流控。若ProcessQueue对象的msgCount大于了消费端的流控阈值,默认值为1000,主要通过DefaultMQPushConsumer.pullThresholdForQueue的执行进行判断。当调用的processQueue.getMsgCount().get()的数值大于DefaultMQPushConsumer.pullThresholdForQueue 的值时候会进行 PullMessageService.executePullRequestLater方法。
long cachedMessageCount = processQueue.getMsgCount().get();
        long cachedMessageSizeInMiB = processQueue.getMsgSize().get() / (1024 * 1024);
        if (cachedMessageCount > this.defaultMQPushConsumer.getPullThresholdForQueue()) 
            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
            if ((queueFlowControlTimes++ % 1000) == 0) 
                log.warn(
                    "the cached message count exceeds the threshold , so do flow control, minOffset=, maxOffset=, count=, size= MiB, pullRequest=, flowControlTimes=",
                    this.defaultMQPushConsumer.getPullThresholdForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
            
            return;
        
  1. 主要进行消费者端进行速度和控制消费速度的流控。主要会通过 this.defaultMQPushConsumer.getPullThresholdSizeForQueue()与进行计算消息的内存空间的总大小进行对比,单位是M,当大于系统定义的 this.defaultMQPushConsumer.getPullThresholdSizeForQueue()的阈值大小的时候,则会进行限流处理。
        if (cachedMessageSizeInMiB > this.defaultMQPushConsumer.getPullThresholdSizeForQueue()) 
            this.executePullRequestLater(pullRequest, PULL_TIME_DELAY_MILLS_WHEN_FLOW_CONTROL);
            if ((queueFlowControlTimes++ % 1000) == 0) 
                log.warn(
                    "the cached message size exceeds the threshold  MiB, so do flow control, minOffset=, maxOffset=, count=, size= MiB, pullRequest=, flowControlTimes=",
                    this.defaultMQPushConsumer.getPullThresholdSizeForQueue(), processQueue.getMsgTreeMap().firstKey(), processQueue.getMsgTreeMap().lastKey(), cachedMessageCount, cachedMessageSizeInMiB, pullRequest, queueFlowControlTimes);
            
            return;
        

以上3和4步骤中的(DELAY_MILLS_WHEN_FLOW_CONTROL)50毫秒之后,才会将该PullRequest请求放入PullMessageService.pullRequestQueue队列中。从而实现看限流的能力。


    <

    以上是关于深度挖掘RocketMQ底层源码「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行核心的流程(上篇)的主要内容,如果未能解决你的问题,请参考以下文章

    深度挖掘RocketMQ底层源码「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行核心的流程(上篇)

    深度挖掘RocketMQ底层源码「底层问题分析系列」深度挖掘RocketMQ底层那些导致消息丢失的汇总盘点透析([REJECTREQUEST]

    深度挖掘RocketMQ底层源码「底层问题分析系列」深度挖掘RocketMQ底层那些导致消息丢失的汇总盘点透析([REJECTREQUEST]

    深度挖掘RocketMQ底层源码「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行调度的流程(Pull模式)

    深度挖掘 RocketMQ底层源码「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行核心的流程(Pull模式-上)

    精华推荐 | 深入浅出 RocketMQ原理及实战「底层源码挖掘系列」透彻剖析贯穿RocketMQ的消费者端的运行核心的流程(上篇)