RocketMQ源码解析-消息消费

Posted langtutengyixiao

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ源码解析-消息消费相关的知识,希望对你有一定的参考价值。

RocketMQ源码解析-消息消费

1.消费者相关类

2.消费者的启动

3.消息的拉取

4.消费者的负载均衡

5.消息的消费

6.消费进度管理

看了很多遍的代码,还是决定动手把记录下来,梳理一下整体结构和实现细节,给大家一个参考,写的不好的地方请多多指教
RocketMQ中消息的消费分为2种方式,一种是pull模式,一种为push模式(基于pull模式实现),大部分的业务场合下业界用的比较多的是push模式,一句话你没有特殊需求就用push,push模式可以达到准实时的消息推送
那什么时候可以用pull模式呢?比如在高并发的场景下,消费端的性能可能会达到瓶颈的情况下,消费端可以采用pull模式,消费端根据自身消费情况去拉取,虽然push模式在消息拉取的过程中也会有流控(当前ProcessQueue队列有1000条消息还没有消费或者当前ProcessQueue中最大偏移量和最小偏移量超过2000将会触发流控,流控的策略就是延迟50ms再拉取消息),但是这个值在实际情况下,可能每台机器的性能都不太一样,会不好控制。

消费者相关类和API

技术图片

  • MQConsumer
  • sendMessageBack(),If consuming failure,message will be send back to the broker,and delay consuming some time,如果消费端消费失败会把消息发送回broker端,稍后会重新消费
  • fetchSubscribeMessageQueues(topic) ,Fetch message queues from consumer cache according to the topic,根据topic从消费端缓存中获取MessageQueue的Set集合
  • MQPushConsumer
  • registerMessageListener() 注册消息事件监听器,有并发模式和顺序模式俩种
  • subscribe() 基于主题订阅消息,可以带上消息过滤表达式subExpression,TAG或者SQL92,类模式过滤会在5.0.0版本中去掉,不建议使用
  • DefaultMQPushConsumer
  • 先看下DefaultMQPushConsumer的重要参数,如果第一次看会不记得,后面的代码看多了慢慢会体会到这些参数的作用,类中持有DefaultMQPushConsumerImpl对象,DefaultMQPushConsumerImpl类实现了大部分的消息消费功能
//消费组
private String consumerGroup;
//消费端模式,默认为集群模式,还有一种广播模式
private MessageModel messageModel = MessageModel.CLUSTERING;
//根据消费进度从broker拉取不到消息时采取的策略
//1.CONSUME_FROM_LAST_OFFSET 最大偏移量开始
//2.CONSUME_FROM_FIRST_OFFSET 最小偏移量开始
//3.CONSUME_FROM_TIMESTAMP 从消费者启动时间戳开始
private ConsumeFromWhere consumeFromWhere = ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET;
//集群模式下消息队列负载策略
private AllocateMessageQueueStrategy allocateMessageQueueStrategy;

//消息过滤关系
private Map<String /* topic */, String /* sub expression */> subscription = new HashMap<String, String>();

   //消息消费监听器
    private MessageListener messageListener;

    //消息消费进度存储器
    private OffsetStore offsetStore;

    //消费线程最小线程数
    private int consumeThreadMin = 20;

    //消费线程最大线程数,因为消费线程池用的是无界队列,所以这个参数用不上,原因请参考线程池原理
    private int consumeThreadMax = 64;

    //动态调整线程数量的阀值
    private long adjustThreadPoolNumsThreshold = 100000;

    //并发消费时拉取消息前会有流控,会判断处理队列中最大偏移量和最小偏移量的跨度,不能大于2000
    private int consumeConcurrentlyMaxSpan = 2000;

    //push模式下任务拉取的时间间隔
    private long pullInterval = 0;

    //每次消费者实际消费的数量,不是从broker端拉取的数量
    private int consumeMessageBatchMaxSize = 1;

    //从broker端拉取的数量
    private int pullBatchSize = 32;

    //是否每次拉取之后都跟新订阅关系
    private boolean postSubscriptionWhenPull = false;

   //消息最大消费重试次数
    private int maxReconsumeTimes = -1;

    //延迟将该消息提交到消费者的线程池等待时间,默认1s
    private long suspendCurrentQueueTimeMillis = 1000;

    //消费超时时间,15分钟
    private long consumeTimeout = 15;

消费者的启动

代码入口,DefaultMQPushConsumerImpl#start()方法

 public synchronized void start() throws MQClientException {
        switch (this.serviceState) {
            case CREATE_JUST:
                ...
                this.serviceState = ServiceState.START_FAILED;
                //1.检查配置信息
                this.checkConfig();
                //2.加工订阅信息,将Map<String /* topic*/, String/* subExtentions*/>转换为Map<String,SubscriptionData>,同时,如果消息消费模式为集群模式,还需要为该消费组创建一个重试主题。
                this.copySubscription();

进入copySubscription()方法:

Map<String, String> sub = this.defaultMQPushConsumer.getSubscription();
            if (sub != null) {
                for (final Map.Entry<String, String> entry : sub.entrySet()) {
                    final String topic = entry.getKey();
                    final String subString = entry.getValue();
                    SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
                        topic, subString);
                    this.rebalanceImpl.getSubscriptionInner().put(topic, subscriptionData);
                }
            }

            if (null == this.messageListenerInner) {
                this.messageListenerInner = this.defaultMQPushConsumer.getMessageListener();
            }

            switch (this.defaultMQPushConsumer.getMessageModel()) {
                case BROADCASTING:
                    break;
                case CLUSTERING:
                    //集群模式下给topic创建一个retry的topic,retry+comsumerGroup
                    final String retryTopic = MixAll.getRetryTopic(this.defaultMQPushConsumer.getConsumerGroup());
                    SubscriptionData subscriptionData = FilterAPI.buildSubscriptionData(this.defaultMQPushConsumer.getConsumerGroup(),
                        retryTopic, SubscriptionData.SUB_ALL);
                    this.rebalanceImpl.getSubscriptionInner().put(retryTopic, subscriptionData);
 ···

以上是关于RocketMQ源码解析-消息消费的主要内容,如果未能解决你的问题,请参考以下文章

RocketMQ 源码阅读 ---- 顺序消息

RocketMQ源码系列 broker启动流程源码解析

4RocketMQ 源码解析之 网络通信 Netty

4RocketMQ 源码解析之 网络通信 Netty

源码分析RocketMQ系列索引

RocketMQ源码(21)—ConsumeMessageConcurrentlyService并发消费消息源码