RabbitMQ死信队列

Posted LvhaoIT

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ死信队列相关的知识,希望对你有一定的参考价值。

RabbitMQ(五)死信队列

文章目录


6. 死信队列

​ 死信:无法被消费的消息,在某些时候由于特定的原因到导致了queue中某些消息无法被消费,这样的消息如果没有后续处理,就会成为死信,有了死信,就自然有了死信队列

​ 应用场景,为了保证订单业务的数据不丢失,需要使用到RabbitMQ的死信队列机制,当消息发生消费异常时,将消息投入死信队列中。场景:用户在商城下单成功并点击支付后在指定时间未支付时自动失效。

6.1 死信产生的原因

1. 消息TTL过期
2. 队列达到了最大长度(队列已经排满了,无法再添加到MQ中)
3. 消息被拒绝(basic.reject 或 basic.nack)并且requeue = false

6.2 死信队列实战

6.2.1 代码架构图

6.2.2 消息TTL过期

​ 我们可以在交换机声明处,或者队列声明处设置消息过期时间TTL,当消息超过这个时间还没有被消费,则会转入死信队列。

​ 我们在这次实战中通过关闭消费端,由生产者发送十条信息,消息发送后没有被消费,超时后就会发送到死信队列中,由死信队列消费。

  1. 生产者 Producer (设置过期时间TTL,发送消息)

    public class Producer 
      public static void main(String[] args) throws Exception
        Channel channel = RabbitMqUtil.getChannel();
    
        //死信消息 设置TTL时间 单位是ms
        AMQP.BasicProperties properties =
          new AMQP.BasicProperties()
          .builder().expiration("10000").build();
    
        for (int i = 1; i <= 10; i++) 
          String message = "info"+i;
          channel.basicPublish(ExchangeName.Normal_Exchange.getName(), "zhangsan",properties,message.getBytes("UTF-8"));
          System.out.println("发送消息 "+message+" 成功");
        
      
    
    
  2. 普通消费者 Consumer01 (启动后关闭该消费者,模拟其接收不到消息)

    public class Consumer01 
    
      public static void main(String[] args)throws Exception 
        Channel channel = RabbitMqUtil.getChannel();
        //声明普通交换机和死信交换机
        //普通交换机
        channel.exchangeDeclare(ExchangeName.Normal_Exchange.getName(), BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(ExchangeName.Dead_Exchange.getName(), BuiltinExchangeType.DIRECT);
    
        HashMap<String, Object> arguments = new HashMap<>();
    
        //过期时间 10s 队列处也可以设置ttl 但是一般在交换机处设置
        //        arguments.put("x-message-ttl",10000);
        //正常队列设置死信交换机
        arguments.put("x-dead-letter-exchange",ExchangeName.Dead_Exchange.getName());
        //设置死信RoutingKey
        arguments.put("x-dead-letter-routing-key","lisi");
    
        //声明普通队列
        channel.queueDeclare(QueueName.Normal_Queue.getName(), false,false,false,arguments);
    
        /
        //声明死信队列
        channel.queueDeclare(QueueName.Dead_Queue.getName(), false,false,false,null);
    
        //绑定普通队列
        channel.queueBind(QueueName.Normal_Queue.getName(), ExchangeName.Normal_Exchange.getName(), "zhangsan");
    
        //绑定死信队列
        channel.queueBind(QueueName.Dead_Queue.getName(), ExchangeName.Dead_Exchange.getName(), "lisi");
    
        System.out.println("等待接收消息。。。。");
        //ttl接收消息
        channel.basicConsume(QueueName.Normal_Queue.getName(), true, (consumerTag, message) -> 
          String msg = new String(message.getBody(), "UTF-8");
          System.out.println("ConsumerO1接收到消息:" + msg);
          System.out.println("处理完成");
        , (consumerTag) -> 
          System.out.println("Consumer01 错误消息被中断:" + consumerTag);
        );
    
      
    
    
    
  3. 死信队列消费者 Consumer02 (启动后关闭此队列,观察消息去向后打开消费掉死信)

    /**
     * 死信队列, 消费者2
     */
    public class Consumer02 
    
        public static void main(String[] args)throws Exception 
            Channel channel = RabbitMqUtil.getChannel();
            //声明死信交换机
            channel.exchangeDeclare(ExchangeName.Dead_Exchange.getName(), BuiltinExchangeType.DIRECT);
    
            //声明死信队列
            channel.queueDeclare(QueueName.Dead_Queue.getName(), false,false,false,null);
    
            //绑定死信队列
            channel.queueBind(QueueName.Dead_Queue.getName(), ExchangeName.Dead_Exchange.getName(), "lisi");
    
            System.out.println("等待接收消息。。。。");
            //接收消息
            channel.basicConsume(QueueName.Dead_Queue.getName(), true,(consumerTag,message)->
                System.out.println("Consumer02 接收到消息:"+new String(message.getBody(),"UTF-8"));
            ,tag->);
    
        
    
    
    

操作步骤

  1. 查看未发送消息状态下 队列状态

  2. 启动生产者代码 发送十条消息 此时正常队列中有十条未消费信息

  3. 等待时间过去10s,等待消息过期,正常队列中的消息由于没有被消费,消息进入死信队列

  4. 启动死信队列,可以看到过期的消息被一条一条消费。

6.2.3 队列达到最大长度

​ 我们可以在队列声明处设置队列的长度,当队列中消息超过这个长度时,则会转入死信队列。

​ 我们在这次实战中通过给消费者设置队列长度,然后将它关闭,不消费信息,消息发送后超过这个长度,超过这个长度的消息就会发送到死信队列中,由死信队列消费。

  1. 生产者 Producer

    public class Producer 
      public static void main(String[] args) throws Exception
        Channel channel = RabbitMqUtil.getChannel();
    
        for (int i = 1; i <= 10; i++) 
          String message = "info"+i;
          channel.basicPublish(ExchangeName.Normal_Exchange.getName(), "zhangsan",null,message.getBytes("UTF-8"));
          System.out.println("发送消息 "+message+" 成功");
        
      
    
    
  2. 普通消费者 Consumer01 (启动后关闭该消费者,模拟其接收不到消息)

    public class Consumer01 
    
      public static void main(String[] args)throws Exception 
        Channel channel = RabbitMqUtil.getChannel();
        //声明普通交换机和死信交换机
        //普通交换机
        channel.exchangeDeclare(ExchangeName.Normal_Exchange.getName(), BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(ExchangeName.Dead_Exchange.getName(), BuiltinExchangeType.DIRECT);
    
        HashMap<String, Object> arguments = new HashMap<>();
    
        //设置队列最大长度,达到这个长度将后续消息转发给c2消费者
        arguments.put("x-max-length",6);
        //正常队列设置死信交换机
        arguments.put("x-dead-letter-exchange",ExchangeName.Dead_Exchange.getName());
        //设置死信RoutingKey
        arguments.put("x-dead-letter-routing-key","lisi");
    
        //声明普通队列
        channel.queueDeclare(QueueName.Normal_Queue.getName(), false,false,false,arguments);
    
        /
        //声明死信队列
        channel.queueDeclare(QueueName.Dead_Queue.getName(), false,false,false,null);
    
        //绑定普通队列
        channel.queueBind(QueueName.Normal_Queue.getName(), ExchangeName.Normal_Exchange.getName(), "zhangsan");
    
        //绑定死信队列
        channel.queueBind(QueueName.Dead_Queue.getName(), ExchangeName.Dead_Exchange.getName(), "lisi");
    
        System.out.println("等待接收消息。。。。");
        //ttl接收消息
        channel.basicConsume(QueueName.Normal_Queue.getName(), true, (consumerTag, message) -> 
          String msg = new String(message.getBody(), "UTF-8");
          System.out.println("ConsumerO1接收到消息:" + msg);
          System.out.println("处理完成");
        , (consumerTag) -> 
          System.out.println("Consumer01 错误消息被中断:" + consumerTag);
        );
    
      
    
    
    
  3. 死信队列消费者 Consumer02 (启动后开启此队列,观察消息是否被死信队列消费)

    /**
     * 死信队列, 消费者2
     */
    public class Consumer02 
    
        public static void main(String[] args)throws Exception 
            Channel channel = RabbitMqUtil.getChannel();
            //声明死信交换机
            channel.exchangeDeclare(ExchangeName.Dead_Exchange.getName(), BuiltinExchangeType.DIRECT);
    
            //声明死信队列
            channel.queueDeclare(QueueName.Dead_Queue.getName(), false,false,false,null);
    
            //绑定死信队列
            channel.queueBind(QueueName.Dead_Queue.getName(), ExchangeName.Dead_Exchange.getName(), "lisi");
    
            System.out.println("等待接收消息。。。。");
            //接收消息
            channel.basicConsume(QueueName.Dead_Queue.getName(), true,(consumerTag,message)->
                System.out.println("Consumer02 接收到消息:"+new String(message.getBody(),"UTF-8"));
            ,tag->);
    
        
    
    
    

操作步骤

​ 注意:因为我们修改了队列的参数,所以需要将原来的队列删除后才能重新创建。

  1. 启动两个消费者 然后关闭,打开生产者(可以发现超出长度的消息自动转入死信队列中)

  2. 启动两个消费者,可以看到死信队列消费了其中四条消息

6.2.5 消息被拒绝

​ 我们可以不使用自动应答,而采用手动应答,在手动应答中的拒绝应答,并且拒绝消息重新入队,若存在死信队列,则消息会转入死信队列。

​ 我们在这次实战中通过在消息应答里拒绝一部分应答,并阻止消息重新入队,这条消息就会发送到死信队列中,由死信队列消费。

  1. 生产者 Producer

    public class Producer 
      public static void main(String[] args) throws Exception
        Channel channel = RabbitMqUtil.getChannel();
    
        for (int i = 1; i <= 10; i++) 
          String message = "info"+i;
          channel.basicPublish(ExchangeName.Normal_Exchange.getName(), "zhangsan",null,message.getBytes("UTF-8"));
          System.out.println("发送消息 "+message+" 成功");
        
      
    
    
  2. 普通消费者 Consumer01

    public class Consumer01 
    
      public static void main(String[] args)throws Exception 
        Channel channel = RabbitMqUtil.getChannel();
        //声明普通交换机和死信交换机
        //普通交换机
        channel.exchangeDeclare(ExchangeName.Normal_Exchange.getName(), BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(ExchangeName.Dead_Exchange.getName(), BuiltinExchangeType.DIRECT);
    
        HashMap<String, Object> arguments = new HashMap<>();
    
        //设置队列最大长度,达到这个长度将后续消息转发给c2消费者
        arguments.put("x-max-length",6);
        //正常队列设置死信交换机
        arguments.put("x-dead-letter-exchange",ExchangeName.Dead_Exchange.getName());
        //设置死信RoutingKey
        arguments.put("x-dead-letter-routing-key","lisi");
    
        //声明普通队列
        channel.queueDeclare(QueueName.Normal_Queue.getName(), false,false,false,arguments);
    
        /
        //声明死信队列
        channel.queueDeclare(QueueName.Dead_Queue.getName(), false,false,false,null);
    
        //绑定普通队列
        channel.queueBind(QueueName.Normal_Queue.getName(), ExchangeName.Normal_Exchange.getName(), "zhangsan");
    
        //绑定死信队列
        channel.queueBind(QueueName.Dead_Queue.getName(), ExchangeName.Dead_Exchange.getName(), "lisi");
    
        System.out.println("等待接收消息。。。。");
        channel.basicConsume(QueueName.Normal_Queue.getName(), false,(consumerTag,message)->
          String msg = new String(message.getBody(),"UTF-8");
          if ("info5".equals(msg))
            System.out.println(msg+"此消息是被拒绝的!");
            //拒绝应答 1为消息标签,2为是否放回普通队列
            channel.basicReject(message.getEnvelope().getDeliveryTag(),false);
          else 
            System.out.println("Consumer01 接收到消息:"+new String(message.getBody(),"UTF-8"));
            channel.basicAck(message.getEnvelope().getDeliveryTag(),false<

    以上是关于RabbitMQ死信队列的主要内容,如果未能解决你的问题,请参考以下文章

    RabbitMQ一文带你搞定RabbitMQ死信队列

    rabbitmq死信队列及延迟队列

    RabbitMQ—SpringBoot中实现死信队列

    RabbitMQ死信队列

    RabbitMQ的死信队列和延时队列

    RabbitMQ--死信队列/延迟队列--使用/原理