分布式消息队列 RocketMQ 源码分析 —— Message 拉取与消费(下)

Posted 芋道源码

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了分布式消息队列 RocketMQ 源码分析 —— Message 拉取与消费(下)相关的知识,希望对你有一定的参考价值。

摘要: 原创出处 http://www.iocoder.cn/RocketMQ/message-pull-and-consume-second/ 「芋道源码」欢迎转载,保留摘要,谢谢!

本文主要基于 RocketMQ 4.0.x 正式版

  • 1、概述

  • 2、Consumer

  • 3、PushConsumer 一览

  • 4、PushConsumer 订阅

  • 5、PushConsumer 消息队列分配

  • 6、PushConsumer 消费进度读取

  • 7、PushConsumer 拉取消息

  • 8、PushConsumer 消费消息

  • 9、PushConsumer 发回消费失败消息

  • 10、Consumer 消费进度

  • 11、结尾


1、概述

本文接:《RocketMQ 源码分析 —— Message 拉取与消费(上)》。

主要解析 Consumer消费 逻辑涉及到的源码。

2、Consumer

MQ 提供了两类消费者:

  • PushConsumer:

    • 在大多数场景下使用。

    • 名字虽然是 Push 开头,实际在实现时,使用 Pull 方式实现。通过 Pull 不断不断不断轮询 Broker 获取消息。当不存在新消息时, Broker 会挂起请求,直到有新消息产生,取消挂起,返回新消息。这样,基本和 Broker 主动 Push 做到接近的实时性(当然,还是有相应的实时性损失)。原理类似 长轮询( Long-Polling )。

  • PullConsumer

本文主要讲解 PushConsumer,部分讲解 PullConsumer,跳过 顺序消费
本文主要讲解 PushConsumer,部分讲解 PullConsumer,跳过 顺序消费
本文主要讲解 PushConsumer,部分讲解 PullConsumer,跳过 顺序消费

3、PushConsumer 一览

先看一张 PushConsumer 包含的组件以及组件之间的交互图:

  • RebalanceService:均衡消息队列服务,负责分配当前 Consumer 可消费的消息队列( MessageQueue )。当有新的 Consumer 的加入或移除,都会重新分配消息队列。

  • PullMessageService:拉取消息服务,不断不断不断从 Broker 拉取消息,并提交消费任务到 ConsumeMessageService

  • ConsumeMessageService:消费消息服务,不断不断不断消费消息,并处理消费结果。

  • RemoteBrokerOffsetStore: Consumer 消费进度管理,负责从 Broker 获取消费进度,同步消费进度到 Broker

  • ProcessQueue :消息处理队列。

  • MQClientInstance :封装对 Namesrv, Broker 的 API调用,提供给 Producer、 Consumer 使用。

4、PushConsumer 订阅

DefaultMQPushConsumerImpl#subscribe(...)

 
   
   
 
  1.  1: public void subscribe(String topic, String subExpression) throws MQClientException {

  2.  2:     try {

  3.  3:         // 创建订阅数据

  4.  4:         SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(), //

  5.  5:             topic, subExpression);

  6.  6:         this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);

  7.  7:         // 通过心跳同步Consumer信息到Broker

  8.  8:         if (this.mQClientFactory != null) {

  9.  9:             this.mQClientFactory.sendHeartbeatToAllBrokerWithLock();

  10. 10:         }

  11. 11:     } catch (Exception e) {

  12. 12:         throw new MQClientException("subscription exception", e);

  13. 13:     }

  14. 14: }

  • 说明 :订阅 Topic 。

  • 第 3 至 6 行 :创建订阅数据。详细解析见:FilterAPI.buildSubscriptionData(...)。

  • 第 7 至 10 行 :通过心跳同步 Consumer 信息到 Broker

FilterAPI.buildSubscriptionData(...)

 
   
   
 
  1.  1: public static SubscriptionData buildSubscriptionData(final String consumerGroup, String topic,

  2.  2:     String subString) throws Exception {

  3.  3:     SubscriptionData subscriptionData = new SubscriptionData();

  4.  4:     subscriptionData.setTopic(topic);

  5.  5:     subscriptionData.setSubString(subString);

  6.  6:     // 处理订阅表达式

  7.  7:     if (null == subString || subString.equals(SubscriptionData.SUB_ALL) || subString.length() == 0) {

  8.  8:         subscriptionData.setSubString(SubscriptionData.SUB_ALL);

  9.  9:     } else {

  10. 10:         String[] tags = subString.split("\\|\\|");

  11. 11:         if (tags.length > 0) {

  12. 12:             for (String tag : tags) {

  13. 13:                 if (tag.length() > 0) {

  14. 14:                     String trimString = tag.trim();

  15. 15:                     if (trimString.length() > 0) {

  16. 16:                         subscriptionData.getTagsSet().add(trimString);

  17. 17:                         subscriptionData.getCodeSet().add(trimString.hashCode());

  18. 18:                     }

  19. 19:                 }

  20. 20:             }

  21. 21:         } else {

  22. 22:             throw new Exception("subString split error");

  23. 23:         }

  24. 24:     }

  25. 25:

  26. 26:     return subscriptionData;

  27. 27: }

  • 说明 :根据 Topic 和 订阅表达式 创建订阅数据

  • subscriptionData.subVersion = System.currentTimeMillis()。

DefaultMQPushConsumer#registerMessageListener(...)

 
   
   
 
  1.  1: public void registerMessageListener(MessageListenerConcurrently messageListener) {

  2.  2:     this.messageListener = messageListener;

  3.  3:     this.defaultMQPushConsumerImpl.registerMessageListener(messageListener);

  4.  4: }

  • 说明 :注册消息监听器。

5、PushConsumer 消息队列分配

分布式消息队列 RocketMQ 源码分析 —— Message 拉取与消费(下)

RebalanceService

 
   
   
 
  1.  1: public class RebalanceService extends ServiceThread {

  2.  2:

  3.  3:     /**

  4.  4:      * 等待间隔,单位:毫秒

  5.  5:      */

  6.  6:     private static long waitInterval =

  7.  7:         Long.parseLong(System.getProperty(

  8.  8:             "rocketmq.client.rebalance.waitInterval", "20000"));

  9.  9:

  10. 10:     private final Logger log = ClientLogger.getLog();

  11. 11:     /**

  12. 12:      * MQClient对象

  13. 13:      */

  14. 14:     private final MQClientInstance mqClientFactory;

  15. 15:

  16. 16:     public RebalanceService(MQClientInstance mqClientFactory) {

  17. 17:         this.mqClientFactory = mqClientFactory;

  18. 18:     }

  19. 19:

  20. 20:     @Override

  21. 21:     public void run() {

  22. 22:         log.info(this.getServiceName() + " service started");

  23. 23:

  24. 24:         while (!this.isStopped()) {

  25. 25:             this.waitForRunning(waitInterval);

  26. 26:             this.mqClientFactory.doRebalance();

  27. 27:         }

  28. 28:

  29. 29:         log.info(this.getServiceName() + " service end");

  30. 30:     }

  31. 31:

  32. 32:     @Override

  33. 33:     public String getServiceName() {

  34. 34:         return RebalanceService.class.getSimpleName();

  35. 35:     }

  36. 36: }

  • 说明 :均衡消息队列服务,负责分配当前 Consumer 可消费的消息队列( MessageQueue )。


  • 第 26 行 :调用 MQClientInstance#doRebalance(...) 分配消息队列。目前有三种情况情况下触发:

    详细解析见:MQClientInstance#doRebalance(...)。


    • 如 25 等待超时,每 20s 调用一次。

    • PushConsumer 启动时,调用 rebalanceService#wakeup(...) 触发。

    • Broker 通知 Consumer 加入 或 移除时, Consumer 响应通知,调用 rebalanceService#wakeup(...) 触发。

MQClientInstance#doRebalance(...)

 
   
   
 
  1.  1: public void doRebalance() {

  2.  2:     for (Map.Entry<String, MQConsumerInner> entry : this.consumerTable.entrySet()) {

  3.  3:         MQConsumerInner impl = entry.getValue();

  4.  4:         if (impl != null) {

  5.  5:             try {

  6.  6:                 impl.doRebalance();

  7.  7:             } catch (Throwable e) {

  8.  8:                 log.error("doRebalance exception", e);

  9.  9:             }

  10. 10:         }

  11. 11:     }

  12. 12: }