RocketMQ之Consumer

Posted marklogzhu

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ之Consumer相关的知识,希望对你有一定的参考价值。

一、Consumer 介绍

1.1 核心参数

* consumerGroup:消费者组名  
* MessageModel:消息模型,定义了消息传递到消费者的方式,默认是 MessageModel.CLUSTERING
    * MessageModel.BROADCASTING:广播
    * MessageModel.CLUSTERING:集群
* consumeFromWhere: 消费者开始消费的位置,默认值是 ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET
	* ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET:从队列最后的位置开始消费
    * ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET:从队列前面最开始消费
    * ConsumeFromWhere.CONSUME_FROM_TIMESTAMP: 从指定时间开始消费,之前的消息将会被忽略    
* consumeTimestamp:
* allocateMessageQueueStrategy:消息分配策略
* subscription:订阅关系
* offsetStore:存储消息偏移量
* consumeThreadMin:线程池最小值,默认值是20
* consumeThreadMax:线程池最大值,默认值是20
* consumeConcurrentlyMaxSpan:单个队列并行消费最大的跨度,默认2000
* pullThresholdForQueue:一个队列最大的消费个数,默认1000
* pullInterval:消息拉取的时间间隔
* pullBatchSize:消息拉取的个数,默认32啊
* consumeMessageBatchMaxSize:批量消费量,默认1  
* messageListener:消息监听器,用来处理消息,它有两个实现类
    * MessageListenerOrderly:按顺序一个个消费    
    * MessageListenerConcurrently:并行消费

二、消费模式

2.1 集群模式

* 同一个 consumerGroup 里,并且订阅的 tag 也必须是一样的,这样的 consumer 实例才能组成 consumer 集群;
* 当 consumer 使用集群消费时,每条消息只会被 consumer 集群内的任意一个 consumer 实例消费一次;
* 默认的消费模式就是集群模式;
* 集群模式天然实现负载均衡机制

2.2 广播模式

* 同一个 consumerGroup 里的 Consumer 会消费订阅 Topic 的全部消息
* 通过 consumer.setMessageModel(MessageModel.BROADCASTING) 方法设置    

三、Offset 介绍

3.1 Offset 是什么

* 在 RocketMQ 中,相同类型的消息会放到一个 Topic 里,为了可以并行操作,一个 Topic 会有多个 MessageQueue。  
* Offset 是指某个 Topic 下的一条消息在某个 MessageQueue 里的位置;
* 通过 Offset 的值可以定位到这条消息

3.2 Offset 类结构

技术图片

从类结构可以看出 Offset 分为本地文件类型和远程文件类型。

3.2 消费模式采用的 Offset 类型

* 集群模式下因为每个 Consumer 消费所订阅主题的一部分,所以采用远程文件存储 Offset;
* 广播模式下,由于每个 Consumer 需要消费所有的消息,所以采用本地文件存储 Offset。

3.3 Offset 文件存储格式

OffseStore 使用 Json 格式存储,例如:

{
    "OffsetTable":{
        1:{
            "brokeName":"localhost",
            "QueueId":1,
            "Topic":"broker1"
        },
       2:{
            "brokeName":"localhost",
            "QueueId":2,
            "Topic":"broker2"
        }
    }
}

四、不同类型的消费者

根据对读取操作的控制情况,可以消费者分为两种类型。一个是 DefaultMQPushConsumer,由系统控制读取操作,收到消息后自动调用传入的处理方法来处理;另一个是 DefaultMQPullConsumer ,读取操作中的大部分功能由使用者自主控制。

4.1 DefaultMQPushConsumer

DefaultMQPushConsumer 只需要设置好各种参数和设置传入处理消息的回调函数即可,系统收到消息后会自动调用处理函数来处理消息,而且加入新的 DefaultMQPushConsumer 后会自动做负载均衡。

4.1.1 实例

public class Consumer {

    public static void main(String[] args) throws MQClientException {
        // 创建消费者对象
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("consumerGroupName");
        // 设置服务器地址
        consumer.setNamesrvAddr(RocketMQConfig.NAME_SERVER);
        // 订阅指定主题
        consumer.subscribe("topicTest","*");
        // 注册消息监听事件
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                System.out.println("msg:"+msgs);
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        // 启动消费者
        consumer.start();
    }

}

4.2 DefaultMQPullConsumer

4.2.1 消费步骤

1) 读取 topic 的消息队列 message queue 的信息;
2) 按队列去拉取一定数目的消息;
3) 持久化message queue的消费进度 offset;
4) 根据不同的消息状态做不同的处理

4.2.2 拉取结果状态

public enum PullStatus {
	// 拉取成功
    FOUND,
	// 没有消息可以拉取
    NO_NEW_MSG,
	// 过滤结果不匹配
    NO_MATCHED_MSG,
	// 偏移量非法,太大或太小
    OFFSET_ILLEGAL
}

4.2.3 实例

public class PullConsumer {
	// 本地 offset 存储容器,生产环境可以放到数据库或 Redis 中
    private static final Map<MessageQueue, Long> OFFSE_TABLE = new HashMap<MessageQueue, Long>();

    public static void main(String[] args) throws MQClientException, RemotingException, InterruptedException, MQBrokerException {
        DefaultMQPullConsumer pullConsumer = new DefaultMQPullConsumer("DefaultMQPullConsumer");
        // 设置服务器地址
        pullConsumer.setNamesrvAddr(RocketMQConfig.NAME_SERVER);
        // 启动消费者
        pullConsumer.start();
        // 从指定 topic 获取所有的队列
        Set<MessageQueue> messageQueues = pullConsumer.fetchSubscribeMessageQueues("topicTest");
        // 遍历队列,拉取消息
        for (MessageQueue mq : messageQueues) {
            System.out.printf("从队列中消费: %s%n", mq);
            SINGLE_MQ:
            while (true) {
                try {
                    // 获取 offset
                    Long offset = getMessageQueueOffset(mq);
                    // 拉取32个消息
                    PullResult pullResult =
                            pullConsumer.pullBlockIfNotFound(mq, null, offset, 32);
                    System.out.printf("%s%n", pullResult);
                    // 保存 offset
                    putMessageQueueOffset(mq, pullResult.getNextBeginOffset());
                    switch (pullResult.getPullStatus()) {
                        case FOUND:
                            break;
                        case NO_MATCHED_MSG:
                            break;
                        case NO_NEW_MSG:
                            break SINGLE_MQ;
                        case OFFSET_ILLEGAL:
                            break;
                        default:
                            break;
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }

        pullConsumer.shutdown();
    }

    // 保存上次消费的消息下标
    private static void putMessageQueueOffset(MessageQueue mq,
                                              long nextBeginOffset) {
        OFFSE_TABLE.put(mq, nextBeginOffset);
    }

    // 获取上次消费的消息的下标
    private static Long getMessageQueueOffset(MessageQueue mq) {
        Long offset = OFFSE_TABLE.get(mq);
        if (offset != null) {
            return offset;
        }
        return 0l;
    }
}

4.3 DefaultLitePullConsumer

**DefaultMQPullConsumer ** 已经被标识为废弃,替代的是 DefaultLitePullConsumer,下面我们就直接使用 DefaultLitePullConsumer 来操作。

public class LitePullConsumer {

    public static volatile boolean running = true;

    public static void main(String[] args) throws Exception {
        DefaultLitePullConsumer litePullConsumer = new DefaultLitePullConsumer("LitePullConsumer");
        // 设置服务器地址
        litePullConsumer.setNamesrvAddr(RocketMQConfig.NAME_SERVER);
        // 关闭自动提交偏移量
        litePullConsumer.setAutoCommit(false);
        // 启动消费者
        litePullConsumer.start();
        // 获取队列
        Collection<MessageQueue> mqSet = litePullConsumer.fetchMessageQueues("topicTest");
        List<MessageQueue> list = new ArrayList<>(mqSet);
        List<MessageQueue> assignList = new ArrayList<>();
        for (int i = 0; i < list.size() / 2; i++) {
            assignList.add(list.get(i));
        }
        litePullConsumer.assign(assignList);
        litePullConsumer.seek(assignList.get(0), 10);
        try {
            while (running) {
                List<MessageExt> messageExts = litePullConsumer.poll();
                System.out.printf("%s %n", messageExts);
                litePullConsumer.commitSync();
            }
        } finally {
            litePullConsumer.shutdown();
        }

    }
}

以上是关于RocketMQ之Consumer的主要内容,如果未能解决你的问题,请参考以下文章

rocketMQ之延时处理消息

rocketMQ之延时处理消息

RocketMQ源码之 consumer是怎样消费消息的

消息中间件之RocketMQ简介

RocketMQ高可用设计之消息重试机制

RocketMQ使用之消息保证,重复读,积压,顺序,过滤,延时,事务,死信