RocketMQ 死信队列 | 消费者出现异常如何处理?

Posted shuiyj

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ 死信队列 | 消费者出现异常如何处理?相关的知识,希望对你有一定的参考价值。

RocketMQ 重复消费问题 | 订单系统核心流程引入幂等性机制一文中,我们讨论了消息重复消费的问题,比较好的方案是采用在消费侧使用业务判断法来保证接口的幂等性,这样就能避免消息重复消费的问题。

今天要讨论的是消费者代码执行过程中出现异常,我们应该如何处理?

手动提交 offset

首先来看一段代码,Consumer 类是一个消费者类,它我们为它注册了一个监听器,在处理完消息之后,会将消息的状态返回给 RocketMQ,执行成功返回的是消息状态是 CONSUME_SUCCESS

public class Consumer {
    public static void main(String[] args) throws MQClientException {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("test_consumer");

        // 设置 NameServer 地址
        consumer.setNamesrvAddr("");
        // 订阅 Topic
        consumer.subscribe("TopicTest", "*");
        // 这次回调接口,接收消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List <MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
								// 对消息的处理,比如发放优惠券、积分等
								return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });
        consumer.start();
    }
}

画一张图来表示向 RocketMQ 提交消息状态的流程,如图所示:

技术图片

消息者业务代码出现异常怎么办?

再来看一下消费者的代码中监听器的部分,它说如果消息处理成功,那么就返回消息状态为 CONSUME_SUCCESS,也有可能发放优惠券、积分等操作出现了异常,比如说数据库挂掉了。这个时候应该怎么处理呢?

consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List <MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
								// 对消息的处理,比如发放优惠券、积分等
								return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

我们可以把代码改一改,捕获异常之后返回消息的状态为 RECONSUME_LATER 表示稍后重试。

// 这次回调接口,接收消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List <MessageExt> list, ConsumeConcurrentlyContext consumeConcurrentlyContext) {
                try {
                    // 对消息的处理,比如发放优惠券、积分等
                    return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                } catch (Exception e) {
                    // 万一发生数据库宕机等异常,返回稍后重试消息的状态
                    return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                }

            }
        });

重试队列

这个时候,消息会进入到 RocketMQ 的重试队列中。

  • 比如说消费者所属的消息组名称为AAAConsumerGroup
  • 其重试队列名称就叫做%RETRY%AAAConsumerGroup
  • 重试队列中的消息过一段时间会再次发送给消费者,如果还是无法正常执行会再次进入重试队列
  • 默认重试16次,还是无法执行,消息就会从重试队列进入到死信队列

技术图片

死信队列

  • 重试队列中的消息重试16次任然无法执行,将会进入到死信队列
  • 死信队列的名字是 %DLQ%AAAConsumerGroup
  • 死信队列中的消息可以后台开一个线程,订阅%DLQ%AAAConsumerGroup,并不停重试

技术图片

总结

技术图片

本文从消费者的业务代码出现异常讲起,介绍了 RocketMQ 的重试队列和死信队列:

  1. 代码正常执行返回消息状态为CONSUME_SUCCESS,执行异常返回RECONSUME_LATER
  2. 状态为RECONSUME_LATER的消息会进入到重试队列,重试队列的名称为 %RETRY% + ConsumerGroupName
  3. 重试16次消息任然没有处理成功,消息就会进入到死信队列%DLQ% + ConsumerGroupName;

以上是关于RocketMQ 死信队列 | 消费者出现异常如何处理?的主要内容,如果未能解决你的问题,请参考以下文章

RocketMQ重试队列与死信队列简介

RocketMQ重试队列与死信队列简介

RocketMQ:死信队列和消息幂等

RocketMQ的死信队列

RocketMQ的死信队列

消息队列 - 死信、延迟、重试队列