RabbitMQ—SpringBoot中实现死信队列

Posted 商俊帅

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ—SpringBoot中实现死信队列相关的知识,希望对你有一定的参考价值。

一、什么是死信、死信队列?

1.死信

一条消息经历过下面4种情况任意一种时,会变成死信

  • 1.消费者端配置自动ack acknowledge-mode: auto在重试几次后(配置文件配置默认是3)后进入到死信队列
  • 2.消费者端使用basic.reject和basic.nack拒绝签收消息,并且配置requeue参数是false(即不重回原来的队列)
  • 3.消息在队列存活的时间大于TTL
  • 4.消息队列中消息的数量超过最大长度

2.死信队列

一条消息变为死信时会被重新发布的交换机叫死信交换机,与死信交换机绑定的队列叫做死信队列。跟普通的队列没有多大的区别,多了几个参数而已。如果没有为队列配置死信交换机,则原有的消息被抛弃。用下面的图来展示一下

二、如何配置死信队列?

项目版本:springboot:2.3.6.RELEASE。
引入的依赖:spring-boot-starter-amqpspring-rabbit

1.yml配置文件

spring:
  rabbitmq:
    host: 192.168.184.128
    port: 5672
    username: guest
    password: guest
    virtual-host: /
    listener:
      simple:
        acknowledge-mode: auto # 设置自动确定(ack) manual:为手动确定(即需要调用channel.basicAck才会从队列中删除消息)
        prefetch: 1              #表示消费者端每次从队列拉取多少个消息进行消费,直到手动确认消费完毕后,才会继续拉取下一条
        default-requeue-rejected: false #消费被拒绝时 true:为重回队列 false为否
        retry:
          enabled: true         #是否支持重试,默认false
          max-attempts: 3       #重试最大次数,默认3次
          max-interval: 1000ms #重试最大间隔时间

2.声明业务队列和死信队列

通过下面的配置将业务队列和死信队列绑定起来,当一条消息从业务队列中变成死信后会重发到死信交换机上,重新消费。

   //声明业务queue时绑定死信交换机、死信队列及相关参数配置
   @Bean
   public Queue queue(){
       Map<String, Object> args = new HashMap<>(2);
       args.put("x-dead-letter-exchange",DEAD_LETTER_EXCHANGE);
       args.put("x-dead-letter-routing-key",DEAD_LETTER_ROUTING_KEY);
       //队列的长度为3
       args.put("x-max-length",3);
       //队列中的消息超过3s后过期
       args.put("x-message-ttl",new Long(3000));
       Queue queue = new Queue(QUEUE, true,false,false,args);
       return queue;
   }

上面的配置中两个条件会让消息变为死信

  • args.put(“x-max-length”,3):队列中消息总数大于3
  • args.put(“x-message-ttl”,new Long(3000)):消息超过3s还未被消费

3.生产者

正常使用RabbitTemplate提供的api发送消息即可

rabbitTemplate.convertAndSend(RabbitMQStudentCourseTopicConfig.EXCHANGE,            RabbitMQStudentCourseTopicConfig.ROUTING_KEY_NAME,new MqMessage(studentId,courseId));

4.消费者

消费者分为两类,业务消费者和死信消费者

业务消费者

会报除数不能0的异常

int count=0;

@RabbitListener(queues = RabbitMQStudentCourseTopicConfig.QUEUE)
public void doChooseCourse(Message message, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel)throws IOException{
    MqMessage mqMessage = MessageHelper.msgToObj(message, MqMessage.class)
    log.info("重试次数:{}",++count);
    int i = 5/0;
    log.info("消费成功");
}

死信队列监听者

@RabbitListener(queues = RabbitMQStudentCourseTopicConfig.DEAD_LETTER_QUEUE)
public void doChooseCourse(Message message, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel)throws IOException {
    System.out.println("收到死信消息:" + new String(message.getBody()));
    channel.basicAck(deliveryTag,false);
}

5.RabbitMQ控制台检验


已经创建好了两个队列

三、测试死信及如何处理死信

1.自动ack超过重试次数进入死信队列


从上图的重试时间及次数来看也符合配置max-interval: 1000msmax-attempts: 3

2.手动签收失败不重回队列进入死信队列

yml文件配置:acknowledge-mode: manual

消费者代码修改如下

  int count=0;
  @RabbitListener(queues = RabbitMQStudentCourseTopicConfig.QUEUE)
  public void doChooseCourse(Message message, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel)throws IOException{
      MqMessage mqMessage = MessageHelper.msgToObj(message, MqMessage.class);
      log.info("重试次数:{}",++count);
      try{
          int i = 5/0;
      }catch (Exception e){
      	  //第三个false:失败不重回队列
          channel.basicNack(deliveryTag,false,false);
          log.info("消费失败");
          throw e;
      }
      log.info("消费成功");
  }

注意:当yml文件配置消费方签收方式为手动时,重试次数有时候起作用,有时候不起作用。但是起作用时不是失败多少次才会进死信队列,而是只要执行channel.basicNack(deliveryTag,false,false);这行代码就会进死信队列


那么如何实现手动签收时,失败3次才进入死信队列呢?我们放到下一篇文章中说。

3.消息在队列存活的时间大于TTL进入死信队列

这个测试只要将业务消费者注释掉即可,设置的TTL是3s,测试结果符合预期

4.消息队列中消息的数量超过最大长度进入死信队列

还是把消费者端代码注释,然后运行,看结果

生产者投递消息时间:2021-10-05 18:30:51,id:2904,
生产者投递消息时间:2021-10-05 18:31:08,id:430,
生产者投递消息时间:2021-10-05 18:31:15,id:4646,
生产者投递消息时间:2021-10-05 18:31:29,id:395,
收到死信消息:{"id":2904}

因为队列长度设置的是3,当投放第4个消息即id为395时,最早进入队列id2904被弹出进入死信队列,可见rabbitmq是一个先进先出的队列。


配置踩的坑

问题1

#method<channel.close>(reply-code=406, reply-text=PRECONDITION_FAILED - inequivalent arg 'x-message-ttl' for queue 'STUDENT_COURSE_QUEUE' in vhost '/': received none but current is the value '60000' of type 'signedint', class-id=50, method-id=10)

原因1:在为队列配置死信交换机前已经声明了该队列,rabbitmq不允许重复,所以把之前的队列删掉就可以了
原因2:如果在消费端使用了@RabbitListener的queuesToDeclare属性一定要配置好,这个属性就是显示创建queue。否则也会发生上面的错误

解决办法:通过管理控制台将原来的queue删掉,使用queues属性指定要创建的queue就可以。

以上是关于RabbitMQ—SpringBoot中实现死信队列的主要内容,如果未能解决你的问题,请参考以下文章

SpringBoot整合RabbitMQ实现死信队列

SpringBoot+RabbitMQ 死信队列

SpringBoot+RabbitMQ 死信队列

springBoot集成rabbitmq 之延时(死信)队列

SpringBoot+RabbitMQ 死信队列

SpringBoot+RabbitMQ 死信队列