RocketMQ - 如何用死信队列解决消费者异常

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RocketMQ - 如何用死信队列解决消费者异常相关的知识,希望对你有一定的参考价值。

参考技术A 假设我们的MQ使用都没有问题,但是如果消费者系统的数据库挂了呢?因为我们一直都是假设了一个场景,就是生产者在处理完自己的逻辑之后会推消息到MQ,然后下游消费者系统从MQ里获取消息去执行后续的处理。

那么如果这个时候,消费者系统的数据库宕机了,同样会使消费者从MQ里获取到消息之后,消费线程就会挂掉,没办法继续进行处理。

所以针对这样的场景,消费者系统要怎么处理?应该如何重试?

在下面代码片段中,我们注册了一个监听器回调函数,当consumer获取到消息之后,就会调用这个函数进行处理

我们可以在这个回调函数中对消息进行处理,处理完之后,就可以告诉RocketMQ Consumer这批消息的处理结果。

比如,如果返回的是CONSUME_SUCCESS,那么Consumer就知道这批消息处理完成了,就会提交这批消息的offset到broker上去,然后下次就会继续从broker上获取下一批消息来处理。

但是如果此时我们在上面的回调函数中,对一批消息进行处理的时候,因为数据库宕机了,导致处理逻辑无法完成,此时我们还能返回CONSUME_SUCCESS吗?如果你返回的话,下次就会处理下一批消息,但是这批消息其实没有处理成功,此时必然就导致这批消息丢失了。

如果因为数据库宕机,导致对这批消息处理是异常的,就应该返回一个RECONSUME_LATER状态。

告诉RocketMQ这批消息处理有异常,过段时间再次给我这批消息让我重新试一下。

所以我们的代码应该改成下面这样:

那么RocketMQ在收到你返回的RECONSUME_LATER状态之后,是如何让你进行消费重试的呢?

RocketMQ有一个针对这个ConsumerGroup的重试队列,如果返回了RECONSUME_LATER状态,他会把你这批消息放到这个消费组的重试队列中去。

比如你的消费者组的名称是"VoucherConsumerGroup",那么他会有一个“%RETRY%VoucherConsumerGroup”这个名字的重试队列。

重试队列中的消息会按照配置的时间再次给消费者,让消费者进行处理,如果再次失败,那么会再过一段时间让消费者进行处理,默认最多是重试16次,每次重试之间的间隔时间是可以配置的:

那么如果在16次重试范围内消息处理成功了,自然就没问题了,但是如果你对一批消息重试了16次还是无法处理成功呢?这个时候会把消息放到死信队列中。

其实就是一批消息交给消费者去处理,消费者重试了16次还一直没有处理成功,就不要继续重试这批消息了,就可以认为他们死掉了就可以了,然后这批消息会自动进入死信队列。

死信队列的名字是"%DLQ%VoucherConsumerGroup"

RabbitMQ—SpringBoot中实现死信队列

一、什么是死信、死信队列?

1.死信

一条消息经历过下面4种情况任意一种时,会变成死信

  • 1.消费者端配置自动ack acknowledge-mode: auto在重试几次后(配置文件配置默认是3)后进入到死信队列
  • 2.消费者端使用basic.reject和basic.nack拒绝签收消息,并且配置requeue参数是false(即不重回原来的队列)
  • 3.消息在队列存活的时间大于TTL
  • 4.消息队列中消息的数量超过最大长度

2.死信队列

一条消息变为死信时会被重新发布的交换机叫死信交换机,与死信交换机绑定的队列叫做死信队列。跟普通的队列没有多大的区别,多了几个参数而已。如果没有为队列配置死信交换机,则原有的消息被抛弃。用下面的图来展示一下

二、如何配置死信队列?

项目版本:springboot:2.3.6.RELEASE。
引入的依赖:spring-boot-starter-amqpspring-rabbit

1.yml配置文件

spring:
  rabbitmq:
    host: 192.168.184.128
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    listener:
      simple:
        acknowledge-mode: auto # 设置自动确定(ack) manual:为手动确定(即需要调用channel.basicAck才会从队列中删除消息)
        prefetch: 1              #表示消费者端每次从队列拉取多少个消息进行消费,直到手动确认消费完毕后,才会继续拉取下一条
        default-requeue-rejected: false #消费被拒绝时 true:为重回队列 false为否
        retry:
          enabled: true         #是否支持重试,默认false
          max-attempts: 3       #重试最大次数,默认3次
          max-interval: 1000ms #重试最大间隔时间

2.声明业务队列和死信队列

通过下面的配置将业务队列和死信队列绑定起来,当一条消息从业务队列中变成死信后会重发到死信交换机上,重新消费。

   //声明业务queue时绑定死信交换机、死信队列及相关参数配置
   @Bean
   public Queue queue(){
       Map<String, Object> args = new HashMap<>(2);
       args.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE);
       args.put("x-dead-letter-routing-key",DEAD_LETTER_ROUTING_KEY);
       //队列的长度为3
       args.put("x-max-length",3);
       //队列中的消息超过3s后过期
       args.put("x-message-ttl",new Long(3000));
       Queue queue = new Queue(QUEUE, true,false,false,args);
       return queue;
   }

上面的配置中两个条件会让消息变为死信

  • args.put(“x-max-length”,3):队列中消息总数大于3
  • args.put(“x-message-ttl”,new Long(3000)):消息超过3s还未被消费

3.生产者

正常使用RabbitTemplate提供的api发送消息即可

rabbitTemplate.convertAndSend(RabbitMQStudentCourseTopicConfig.EXCHANGE,            RabbitMQStudentCourseTopicConfig.ROUTING_KEY_NAME,new MqMessage(studentId,courseId));

4.消费者

消费者分为两类,业务消费者和死信消费者

业务消费者

会报除数不能0的异常

int count=0;

@RabbitListener(queues = RabbitMQStudentCourseTopicConfig.QUEUE)
public void doChooseCourse(Message message, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel)throws IOException{
    MqMessage mqMessage = MessageHelper.msgToObj(message, MqMessage.class)
    log.info("重试次数:{}",++count);
    int i = 5/0;
    log.info("消费成功");
}

死信队列监听者

@RabbitListener(queues = RabbitMQStudentCourseTopicConfig.DEAD_LETTER_QUEUE)
public void doChooseCourse(Message message, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel)throws IOException {
    System.out.println("收到死信消息:" + new String(message.getBody()));
    channel.basicAck(deliveryTag,false);
}

5.RabbitMQ控制台检验


已经创建好了两个队列

三、测试死信及如何处理死信

1.自动ack超过重试次数进入死信队列


从上图的重试时间及次数来看也符合配置max-interval: 1000msmax-attempts: 3

2.手动签收失败不重回队列进入死信队列

yml文件配置:acknowledge-mode: manual

消费者代码修改如下

  int count=0;
  @RabbitListener(queues = RabbitMQStudentCourseTopicConfig.QUEUE)
  public void doChooseCourse(Message message, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel)throws IOException{
      MqMessage mqMessage = MessageHelper.msgToObj(message, MqMessage.class);
      log.info("重试次数:{}",++count);
      try{
          int i = 5/0;
      }catch (Exception e){
      	  //第三个false:失败不重回队列
          channel.basicNack(deliveryTag,false,false);
          log.info("消费失败");
          throw e;
      }
      log.info("消费成功");
  }

注意:当yml文件配置消费方签收方式为手动时,重试次数有时候起作用,有时候不起作用。但是起作用时不是失败多少次才会进死信队列,而是只要执行channel.basicNack(deliveryTag,false,false);这行代码就会进死信队列


那么如何实现手动签收时,失败3次才进入死信队列呢?我们放到下一篇文章中说。

3.消息在队列存活的时间大于TTL进入死信队列

这个测试只要将业务消费者注释掉即可,设置的TTL是3s,测试结果符合预期

4.消息队列中消息的数量超过最大长度进入死信队列

还是把消费者端代码注释,然后运行,看结果

生产者投递消息时间:2021-10-05 18:30:51,id:2904,
生产者投递消息时间:2021-10-05 18:31:08,id:430,
生产者投递消息时间:2021-10-05 18:31:15,id:4646,
生产者投递消息时间:2021-10-05 18:31:29,id:395,
收到死信消息:{"id":2904}

因为队列长度设置的是3,当投放第4个消息即id为395时,最早进入队列id2904被弹出进入死信队列,可见rabbitmq是一个先进先出的队列。


配置踩的坑

问题1

#method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-message-ttl' for queue 'STUDENT_COURSE_QUEUE' in vhost '/': received none but current is the value '60000' of type 'signedint', class-id=50, method-id=10)

原因1:在为队列配置死信交换机前已经声明了该队列,rabbitmq不允许重复,所以把之前的队列删掉就可以了
原因2:如果在消费端使用了@RabbitListener的queuesToDeclare属性一定要配置好,这个属性就是显示创建queue。否则也会发生上面的错误

解决办法:通过管理控制台将原来的queue删掉,使用queues属性指定要创建的queue就可以。

以上是关于RocketMQ - 如何用死信队列解决消费者异常的主要内容,如果未能解决你的问题,请参考以下文章

RocketMQ重试队列与死信队列简介

RocketMQ重试队列与死信队列简介

RocketMQ:死信队列和消息幂等

RocketMQ的死信队列

RocketMQ的死信队列

消息队列 - 死信、延迟、重试队列