RocketMQ源码(22)—ConsumeMessageOrderlyService顺序消费消息源码

Posted 刘Java

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ源码(22)—ConsumeMessageOrderlyService顺序消费消息源码相关的知识,希望对你有一定的参考价值。

基于RocketMQ release-4.9.3,深入的介绍了ConsumeMessageOrderlyService顺序消费消息源码。

此前我们学习了consumer消息的拉取流程源码:

  1. RocketMQ源码(18)—DefaultMQPushConsumer消费者发起拉取消息请求源码
  2. RocketMQ源码(19)—Broker处理DefaultMQPushConsumer发起的拉取消息请求源码【一万字】
  3. RocketMQ源码(20)—DefaultMQPushConsumer处理Broker的拉取消息响应源码

当前DefaultMQPushConsumer拉取到消息之后,会将消息提交到对应的processQueue处理队列内部的msgTreeMap中。然后通过consumeMessageService#submitConsumeRequest方法将拉取到的消息构建为ConsumeRequest,然后通过内部的consumeExecutor线程池消费消息。

consumeMessageService有ConsumeMessageConcurrentlyService并发消费和ConsumeMessageOrderlyService顺序消费两种实现,下面我们来看看这两种实现如何消费消息,
此前我们已经学习了并发消费的源码:https://blog.csdn.net/weixin_43767015/article/details/129017181,本次我们先学习顺序消费的源码。

文章目录

1 start启动服务定时锁定消息队列

consumeMessageService服务在DefaultMQPushConsumerImpl#start方法中被初始化并启动,即调用start方法。该方法将会通过scheduledExecutorService定时任务锁定所有分配的mq,保证同时只有一个消费端可以消费。

这个定时任务在启动1s后开始执行,后每20s执行一次,这里的20s可通过**-D rocketmq.client.rebalance.lockInterval**属性设置。因此它的频率与负载均衡的默认频率一致,都是20s。

/**
 * ConsumeMessageOrderlyService的方法
 * 启动服务
 */
public void start() 
    //如果是集群模式
    if (MessageModel.CLUSTERING.equals(ConsumeMessageOrderlyService.this.defaultMQPushConsumerImpl.messageModel())) 
        //启动一个定时任务,启动后1s执行,后续每20s执行一次
        //尝试对所有分配给当前consumer的队列请求broker端的消息队列锁,保证同时只有一个消费端可以消费。
        this.scheduledExecutorService.scheduleAtFixedRate(new Runnable() 
            @Override
            public void run() 
                try 
                    //定期锁定所有消息队列
                    ConsumeMessageOrderlyService.this.lockMQPeriodically();
                 catch (Throwable e) 
                    log.error("scheduleAtFixedRate lockMQPeriodically exception", e);
                
            
        , 1000 * 1, ProcessQueue.REBALANCE_LOCK_INTERVAL, TimeUnit.MILLISECONDS);
    

内部调用RebalanceImpl#lockAll方法锁定该客户端分配的所有消费队列。

/**
 * ConsumeMessageOrderlyService的方法
 * <p>
 * 锁定所有消息队列
 */
public synchronized void lockMQPeriodically() 
    if (!this.stopped) 
        //锁定所有消息队列
        this.defaultMQPushConsumerImpl.getRebalanceImpl().lockAll();
    

1.2 lockAll锁定所有消息队列

该方法尝试锁定所有消息队列。

  1. 根据processQueueTable的数据,构建brokerName到其所有mq的map集合brokerMqs。在负载均衡并为当前消费者新分配消息队列的时候,也会对新分配的消息队列申请broker加锁,加锁成功后才会创建对应的processQueue存入processQueueTable。也就是说,如果是顺序消息,那么processQueueTable中的数据一定是曾经加锁成功了的。
  2. 遍历brokerMqs,调用MQClientAPIImpl#lockBatchMQ的方法,向broker发送同步请求,Code为LOCK_BATCH_MQ,请求批量锁定消息队列,返回锁住的mq集合。
  3. 遍历锁住的mq集合,获取对应的processQueue,设置processQueue的状态,设置locked为true,重新设置加锁的时间。遍历没有锁住的mq,设置locked为false。
/**
 * RebalanceImpl的方法
 * <p>
 * 定时每20s尝试锁定所有消息队列
 */
public void lockAll() 
    /*
     * 1 根据processQueueTable的数据,构建brokerName到所有mq的map集合
     * 在新分配消息队列的时候,也会对新分配的消息队列申请broker加锁,加锁成功后会创建对应的processQueue存入processQueueTable
     * 也就是说,如果是顺序消息,那么processQueueTable的数据一定是曾经加锁成功了的
     */
    HashMap<String, Set<MessageQueue>> brokerMqs = this.buildProcessQueueTableByBrokerName();

    Iterator<Entry<String, Set<MessageQueue>>> it = brokerMqs.entrySet().iterator();
    //遍历集合
    while (it.hasNext()) 
        Entry<String, Set<MessageQueue>> entry = it.next();
        final String brokerName = entry.getKey();
        final Set<MessageQueue> mqs = entry.getValue();

        if (mqs.isEmpty())
            continue;
        //获取指定brokerName的master地址。
        FindBrokerResult findBrokerResult = this.mQClientFactory.findBrokerAddressInSubscribe(brokerName, MixAll.MASTER_ID, true);
        if (findBrokerResult != null) 
            LockBatchRequestBody requestBody = new LockBatchRequestBody();
            requestBody.setConsumerGroup(this.consumerGroup);
            requestBody.setClientId(this.mQClientFactory.getClientId());
            requestBody.setMqSet(mqs);

            try 
                /*
                 * 2 向broker发送同步请求,Code为LOCK_BATCH_MQ,请求批量锁定消息队列,返回锁住的mq集合
                 */
                Set<MessageQueue> lockOKMQSet =
                        this.mQClientFactory.getMQClientAPIImpl().lockBatchMQ(findBrokerResult.getBrokerAddr(), requestBody, 1000);
                //遍历锁住的mq集合
                for (MessageQueue mq : lockOKMQSet) 
                    //获取对应的processQueue,设置processQueue的状态
                    ProcessQueue processQueue = this.processQueueTable.get(mq);
                    if (processQueue != null) 
                        if (!processQueue.isLocked()) 
                            log.info("the message queue locked OK, Group:  ", this.consumerGroup, mq);
                        
                        //设置locked为true
                        processQueue.setLocked(true);
                        //设置加锁的时间
                        processQueue.setLastLockTimestamp(System.currentTimeMillis());
                    
                
                //遍历没有锁住的mq,设置locked为false
                for (MessageQueue mq : mqs) 
                    if (!lockOKMQSet.contains(mq)) 
                        ProcessQueue processQueue = this.processQueueTable.get(mq);
                        if (processQueue != null) 
                            processQueue.setLocked(false);
                            log.warn("the message queue locked Failed, Group:  ", this.consumerGroup, mq);
                        
                    
                
             catch (Exception e) 
                log.error("lockBatchMQ exception, " + mqs, e);
            
        
    

1.1.1 lockBatchMQ批量锁定消息队列

该方法向broker发送同步请求,Code为LOCK_BATCH_MQ,请求批量锁定消息队列,返回锁住的mq集合。

/**
 * MQClientAPIImpl的方法
 *
 * 向broker发送同步请求,Code为LOCK_BATCH_MQ,请求批量锁定消息队列,返回锁住的mq集合
 * @param addr broker地址
 * @param requestBody 请求体
 * @param timeoutMillis 超时时间
 * @return
 * @throws RemotingException
 * @throws MQBrokerException
 * @throws InterruptedException
 */
public Set<MessageQueue> lockBatchMQ(
        final String addr,
        final LockBatchRequestBody requestBody,
        final long timeoutMillis) throws RemotingException, MQBrokerException, InterruptedException 
    //Code为LOCK_BATCH_MQ
    RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.LOCK_BATCH_MQ, null);

    request.setBody(requestBody.encode());
    //同步请求,调用brokerVIPChannel判断是否开启vip通道,如果开启了,那么将brokerAddr的port – 2,因为vip通道的端口为普通端口 – 2。
    RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
            request, timeoutMillis);
    switch (response.getCode()) 
        case ResponseCode.SUCCESS: 
            //解码
            LockBatchResponseBody responseBody = LockBatchResponseBody.decode(response.getBody(), LockBatchResponseBody.class);
            Set<MessageQueue> messageQueues = responseBody.getLockOKMQSet();
            return messageQueues;
        
        default:
            break;
    

    throw new MQBrokerException(response.getCode(), response.getRemark(), addr);

1.1.2 broker处理批量锁定请求lockBatchMQ

broker端通过AdminBrokerProcessor处理LOCK_BATCH_MQ请求。

lockBatchMQ方法就是broker处理批量锁定请求的方法。*broker内部使用一个ConcurrentMap<String/ group */, ConcurrentHashMap<MessageQueue, LockEntry>> 类型的mqLockTable存储不同的consumerGroup的下面的mq到其锁定的clientId的对应关系。**大概逻辑为:

  1. 首先将已被当前clientId锁定和未锁定的mq分别存储到不同集合。然后对于未锁定的mq尝试加锁。
  2. 获取本地锁,防止并发,这是一个ReentrantLock,所有的group的分配都获得同一个锁。
  3. 获取该消费者组对于消息队列的加锁的情况map,存放着下面的消息队列mq到其获取了锁的消费者客户端id的映射关系。一个消费者组下面的一个mq只能被一个clientId锁定。
  4. 遍历未锁定的mq集合。
    1. 如果mq未被任务客户端锁定,那么设置新建一个LockEntry,设置为当前clientId,表示已被当前请求的客户端锁定,内部设置锁定时间戳lastUpdateTimestamp为当前毫秒时间戳。
    2. 如果已被被当前客户端锁定,并且没有过期。那么重新设置锁定时间为当前时间戳,加入到已锁定的mq中,进行下一次循环。每次锁定过期时间为REBALANCE_LOCK_MAX_LIVE_TIME,默认60s,可通过-Drocketmq.broker.rebalance.lockMaxLiveTime的broker参数设置。
    3. 如果锁已过期,那么设置当前clientId获得了锁,进行下一次循环。
    4. 否则,表示所没有过期并且也不是当前clientId获得的锁,仅仅输出日志,进行下一次循环。
/**
 * RebalanceLockManager的方法
 * <p>
 * 尝试批量锁定,返回锁定的mq
 *
 * @param group    消费者组
 * @param mqs      需要锁定的mq集合
 * @param clientId 客户端id
 * @return 已锁定的mq集合
 */
public Set<MessageQueue> tryLockBatch(final String group, final Set<MessageQueue> mqs,
                                      final String clientId) 

    Set<MessageQueue> lockedMqs = new HashSet<MessageQueue>(mqs.size());
    Set<MessageQueue> notLockedMqs = new HashSet<MessageQueue>(mqs.size());
    //将已经锁定和未锁定的mq分别存储到不同集合
    for (MessageQueue mq : mqs) 
        if (this.isLocked(group, mq, clientId)) 
            lockedMqs.add(mq);
         else 
            notLockedMqs.add(mq);
        
    
    //存在未锁定的集合,那么尝试加锁
    if (!notLockedMqs.isEmpty()) 
        try 
            //获取本地锁,是一个ReentrantLock,所有的group的分配都获得同一个锁
            this.lock.lockInterruptibly();
            try 
                //获取该消费者组对于消息队列的加锁的情况map,存放着消息队列到其获取了锁的消费者客户端id的映射关系
                ConcurrentHashMap<MessageQueue, LockEntry> groupValue = this.mqLockTable.get(group);
                if (null == groupValue) 
                    //初始化一个
                    groupValue = new ConcurrentHashMap<>(32);
                    this.mqLockTable.put(group, groupValue);
                
                //遍历未锁定的集合
                for (MessageQueue mq : notLockedMqs) 
                    LockEntry lockEntry = groupValue.get(mq);
                    //如果该mq未锁定
                    if (null == lockEntry) 
                        //新建一个LockEntry,设置为当前clientId,表示已被当前请求的客户端锁定
                        //内部设置锁定时间戳lastUpdateTimestamp为当前毫秒时间戳
                        lockEntry = new LockEntry();
                        lockEntry.setClientId(clientId);
                        groupValue.put(mq, lockEntry);
                        log.info(
                                "tryLockBatch, message queue not locked, I got it. Group:  NewClientId:  ",
                                group,
                                clientId,
                                mq);
                    
                    //如果已被被当前客户端锁定,并且没有过期
                    //每次锁定过期时间为REBALANCE_LOCK_MAX_LIVE_TIME,默认60s,可通过-Drocketmq.broker.rebalance.lockMaxLiveTime的broker参数设置
                    if (lockEntry.isLocked(clientId)) 
                        //重新设置锁定时间为当前时间戳
                        lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
                        //加入到已锁定的mq中
                        lockedMqs.add(mq);
                        //下一次循环
                        continue;
                    
                    //获取客户端id,此时表示不是当前clientId获得的锁,或者锁已经过期
                    String oldClientId = lockEntry.getClientId();
                    //如果锁已过期
                    if (lockEntry.isExpired()) 
                        //那么设置当前clientId获得了锁
                        lockEntry.setClientId(clientId);
                        lockEntry.setLastUpdateTimestamp(System.currentTimeMillis());
                        log.warn(
                                "tryLockBatch, message queue lock expired, I got it. Group:  OldClientId:  NewClientId:  ",
                                group,
                                oldClientId,
                                clientId,
                                mq);
                        //加入到已锁定的mq中
                        lockedMqs.add(mq);
                        //下一次循环
                        continue;
                    
                    //到此表示mq被其他客户端锁定了
                    log.warn(
                            "tryLockBatch, message queue locked by other client. Group:  OtherClientId:  NewClientId:  ",
                            group,
                            oldClientId,
                            clientId,
                            mq);
                
             finally 
                //本地锁解锁
                this.lock.unlock();
            
         catch (InterruptedException e) 
            log.error("putMessage exception", e);
        
    
    //返回已锁定mq集合
    return lockedMqs;

1.1.1.1.1. 分布式mq锁小结

consumeMessageService服务在DefaultMQPushConsumerImpl#start方法中被初始化并启动,即调用start方法。该方法将会通过scheduledExecutorService定时任务锁定所有分配的mq,保证同时只有一个消费端可以消费。

实际上,在之前学习的DefaultMQPushConsumer负载均衡服务的部分就知道,在集群模式加上顺序消费的情况下,一定是要向broker申请messageQueue锁成功之后,才能构建processQueue并且加入到processQueueTable,才能在随后发起拉取消息的请求,所以说,这里的定时任务,仅仅是遍历processQueueTable的所有mq并且申请锁定,起作用更多的是向broker进行分布式mq锁的续期操作。

对于从broker锁定的mq,在客户端的过期时间默认为30s,可以通过客户端启动参数-Drocketmq.client.rebalance.lockMaxLiveTime参数设置。但是在broker端看来,这个锁的过期时间默认60s,可以通过broekr启动参数-Drocketmq.broker.rebalance.lockMaxLiveTime设置。

2 submitConsumeRequest提交消费请求

该方法首先会判断是否分发消费,即dispathToConsume是否为true,如果允许,那么将创建一个ConsumeRequest提交到ConsumeMessageOrderlyService内部的consumeExecutor线程池中进行异步消费,否则什么也不做。注意并没有将消息没有放进ConsumeRequest,因为消费线程会自动拉取treeMap中的消息。

什么时候dispathToConsume为true呢?当当前processQueue的内部的msgTreeMap中有消息,并且consuming=false,即还没有开始消费时,将会返回true,即新提交一个消费任务进去激活消费。如果已经在消费了,那么不会提交新的消费任务,老的消费任务会自动去msgTreeMap拉取消息。

注意这里的dispathToConsume应该是dispatchToConsume,这是RocketMQ中的语法错误。

/**
 * ConsumeMessageOrderlyService的方法
 * 提交顺序消费请求
 *
 * @param msgs             拉取到的消息
 * @param processQueue     处理队列
 * @param messageQueue     消息队列
 * @param dispathToConsume 是否分发消费
 */
@Override
public void submitConsumeRequest(
        final List<MessageExt> msgs,
        final ProcessQueue processQueue,
        final MessageQueue messageQueue,
        final boolean dispathToConsume) 
    //如果允许分发消费
    if (dispathToConsume) 
        //构建消费请求,没有将消费放进去,消费消费会自动拉取treemap中的消息
        ConsumeRequest consumeRequest = new ConsumeRequest(processQueue, messageQueue);
        //将请求提交到consumeExecutor线程池中进行消费
        this.consumeExecutor.submit(consumeRequest);
    

3 ConsumeRequest执行消费任务

ConsumeRequest作为线程任务被ConsumeMessageOrderlyService内部的consumeExecutor线程池异步的执行。

/*
 * 并发消费线程池
 * 最小、最大线程数默认20,阻塞队列为无界阻塞队列LinkedBlockingQueue
 */
this.consumeExecutor = new ThreadPoolExecutor(
        this.defaultMQPushConsumer.getConsumeThreadMin(),
        this.defaultMQPushConsumer.getConsumeThreadMax(),
        1000 * 60,
        TimeUnit.MILLISECONDS,
        this.consumeRequestQueue,
        new ThreadFactoryImpl(consumeThreadPrefix));
//单线程的延迟任务线程池,用于定时执行锁定请求以及延迟提交新的消费请求
this.scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(new ThreadFactoryImpl("ConsumeMessageScheduledThread_"));

首先,我们要记住,在之前学习的DefaultMQPushConsumer负载均衡服务的部分就知道,在集群模式加上顺序消费的情况下,一定是要向broker申请messageQueue锁成功之后,才能构建processQueue并且加入到processQueueTable,才能在随后发起拉取消息的请求,所以说,能够进入到ConsumeRequest执行消息消费情况,一定是此前获得过broker的messageQueue锁。

ConsumeRequest的run方法,就是顺序消费的核心方法。下面我们来看看它的大概逻辑:

  1. 如果处理队列被丢弃,那么直接返回,不再消费,例如负载均衡时该队列被分配给了其他新上线的消费者,尽量避免重复消费。
  2. 消费消息之前先获取当前messageQueue的本地锁,锁对象是一个Object对象,每一个mq对应一个不同的Object,采用原生的synchronized阻塞式的获取同步锁。这将导致ConsumeMessageOrderlyService的线程池中的线程将不会同时并发的消费同一个队列。
  3. 如果是广播模式,或者是集群模式,并且锁定了processQueue处理队列,并且processQueue处理队列锁没有过期,那么可以消费消息。p**rocessQueue处理队列锁定实际上就是在负载均衡的时候向broker申请的消息队列分布式锁,申请成功之后将processQueue.locked属性置为true。**内部一个循环中不断的消费,直到消费超时或者条件不满足退出循环。
    1. 如果处理队列被丢弃,那么直接返回,不再消费,例如负载均衡时该队列被分配给了其他新上线的消费者,尽量避免重复消费。
    2. 如果是集群模式,并且没有锁定了processQueue处理队列,或者processQueue处理队列锁已经过期,那么调用tryLockLaterAndReconsume尝试延迟10ms请求broekr加锁并重新延迟提交新的消费请求。
    3. 计算消费时间。如果单次消费任务的消费时间大于默认60s,那么延迟10ms提交新的消费请求,并且结束循环,本次消费任务结束。单次最大消费时间可以通过-Drocketmq.client.maxTimeConsumeContinuously配置启动参数来设置时间。
    4. 调用getConsumeMessageBatchMaxSize方法,获取单次批量消费的数量consumeBatchSize,默认1,可以通过DefaultMQPushConsumer.consumeMessageBatchMaxSize的属性配置。
    5. 调用takeMessages方法,从processQueue内部的msgTreeMap有序map集合中获取offset最小的consumeBatchSize条消息,按顺序从最小的offset返回,保证有序性。
    6. 调用resetRetryAndNamespace方法,重置重试topic,当消息是重试消息的时候,将msg的topic属性从重试topic还原为真实的topic。
    7. 如果takeMessages方法拉取到了消息,那么进行消费。
      1. 如果有钩子,那么执行consumeMessageBefore前置方法。我们可以通过DefaultMQPushConsumerImpl#registerConsumeMessageHook方法注册消费钩子ConsumeMessageHook,在消费消息的前后调用。
      2. 真正消费消息之前再获取processQueue的本地消费锁,保证消息消费时,一个处理队列不会被并发消费。从这里可知,顺序消费需要获取三把锁,broker的messageQueue锁,本地的messageQueue锁,本地的processQueue锁。
      3. 调用listener#consumeMessage方法,进行消息消费,调用实际的业务逻辑,返回执行状态结果,有四种状态,ConsumeOrderlyStatus.SUCCESS 和 ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT推荐使用,ConsumeOrderlyStatus.ROLLBACK和ConsumeOrderlyStatus.COMMIT已被废弃。
      4. 解锁,

        以上是关于RocketMQ源码(22)—ConsumeMessageOrderlyService顺序消费消息源码的主要内容,如果未能解决你的问题,请参考以下文章

        RocketMQ源码—RocketMQ源码调试环境准备

        源码分析RocketMQ系列索引

        RocketMQ 源码合集

        RocketMQ 源码合集

        #yyds干货盘点# 一文带你 RocketMQ 源码调试环境搭建

        RocketMQ源码(21)—ConsumeMessageConcurrentlyService并发消费消息源码