RabbitMQ中的死信队列

Posted 杀手不太冷!

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ中的死信队列相关的知识,希望对你有一定的参考价值。

死信队列

概念

当queue消息队列中的消息由于一些原因没办法被消费,如果这些消息一直没办法被处理,就会从这个正常的消息队列转移到死信消息队列中。

应用场景:当用户下单之后,如果一直不支付,那么用户下单的这条消息就会被存放到死信队列中去。

死信的来源

1.当queue消息队列满的时候,再进来的消息就会被放到死信队列中去。

2.在消息应答的时候,如果消费者一直没有告诉RabbitMQ有没有成功处理消息,那么RabbitMQ消息队列就不清楚自己到底要不要删除这条消息,这个时候消息队列中的消息一直没办法处理,这样这条消息也会被放到死信队列中去。

3.消息TTL过期,什么意思?TTL的全拼是Time To Live意思是指存活时间,就是消息队列中存放的消息一般都是有一定时间的,超过了这个时间,这条消息就会被放到死信队列中去。

用一个例子来演示死信队列

我们现在有一个思路,现在有一个发送者,这个发送者会往正常的队列中发送消息,但是在发送消息的时候会给这些消息设置一个TTL(Time to Live)也即是消息的存活过期时间为10s,超过了这个时间,那么正常队列中的消息就会被转移到死信队列中去;然后我们让发送者往正常队列中发送10条消息,如果在10s内没有消费者去消费正常队列中的消息,那么这些消息就会被转移到死信队列中;

最后因为死信队列也是一个消息队列,所以我们也可以写一个消费者,让消费者消费死信队列中的消息。

上面的思路演示如下:

写消费者1

首先写一个消费者1也即是Consumer1,在消费者1中我们要声明正常队列,死信队列,正常队列的交换机,死信队列的交换机,还要指定正常队列里面的消息出现TTL的时候,这些消息会被存放到哪个死信队列中,消费者1如下图:

/**
 * @Date 2021/11/12 15:00
 * @Author 望轩
 *
 * 死信队列
 * 消费者1
 */
public class Consumer01 {
    //普通交换机的名称
    public static final String NORMAL_EXCHANGE="normal_exchange";
    //死信交换机的名称
    public static final String DEAD_EXCHANGE="dead_exchange";
    //普通队列的名称
    public static final String NORMAL_QUEUE="normal_queue";
    //死信队列的名称
    public static final String DEAD_QUEUE="dead_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtil.getChannel();

        //声明死信和普通交换机类型为direct类型
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);

        //声明普通队列
        Map<String,Object> arguments=new HashMap<>();
        arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
        arguments.put("x-dead-letter-routing-key","lisi");
        channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);

        //声明死信队列
        channel.queueDeclare(DEAD_QUEUE,false,false,false,null);

        //绑定普通的交换机与普通的队列
        channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");

        //绑定死信的交换机与死信的队列
        channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");

        //如果能成功接收到消息会调用的回调函数
        DeliverCallback deliverCallback=(consumerTag, message)->{
            System.out.println("Consumer01接收者接收到的消息:"+new String(message.getBody()));
        };

        //如果取消从消息队列中获取消息时会调用的回调函数
        CancelCallback cancelCallback= consumerTag->{
            System.out.println("消息消费被中断");
        };

        channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,cancelCallback);
    }
}

写生产者

我们本次的生产者主要是用来往正常队列中发送消息的,它不用管死信队列是什么样的,死信队列是什么样的那是消费者1关心的事情,生产者如下图:

/**
 * @Date 2021/11/12 15:22
 * @Author 望轩
 *
 * 死信队列之生产者代码
 */
public class Producer01 {
    //普通交换机的名称
    public static final String NORMAL_EXCHANGE="normal_exchange";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtil.getChannel();
        //死信消息,设置TTL时间,Time to live,消息存活时间,单位是ms
        AMQP.BasicProperties properties=new AMQP.BasicProperties()
                .builder().expiration("1000").build();

        for(int i=1;i<11;i++){
            String message="info"+i;
            channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,message.getBytes());
        }
    }
}

写消费者2

消费者2主要是用来消费死信队列中的消息的,如下图:

/**
 * @Date 2021/11/12 15:36
 * @Author 望轩
 */
public class Consumer02 {
    //死信队列的名称
    public static final String DEAD_QUEUE="dead_queue";

    public static void main(String[] args) throws IOException, TimeoutException {
        Channel channel = RabbitMQUtil.getChannel();

        //如果能成功接收到消息会调用的回调函数
        DeliverCallback deliverCallback=(consumerTag, message)->{
            System.out.println("Consumer02接收者接收死信队列中的东西:"+new String(message.getBody()));
        };

        //如果取消从消息队列中获取消息时会调用的回调函数
        CancelCallback cancelCallback= consumerTag->{
            System.out.println("消息消费被中断");
        };

        channel.basicConsume(DEAD_QUEUE,true,deliverCallback,cancelCallback);
    }
}

测试

因为我们是在Consumer1消费者1里面声明的队列和交换机,所以第一步我们一定要先启动Consumer1消费者,然后会在RabbitMQ中生成对应的队列和交换机。

启动完Consumer1消费者之后,我们的队列和交换机都已经生成了,所以我们就可以用发送者往正常队列里面发送消息了,我们启动生产者,但是注意这个时候要关闭消费者1,不要让消费者1消费正常队列中的消息,这样的话,在10s之后,由于正常队列中的消息一直没有被消费,所以它会把正常队列中的信息都转移到死信队列中,如下图:

接着我们启动完消费者2服务之后,由于消费者2服务会消费死信队列中的消息,所以会发生如下情况,如下图:

以上是关于RabbitMQ中的死信队列的主要内容,如果未能解决你的问题,请参考以下文章

rabbitmq死信队列及延迟队列

RabbitMQ死信队列

RabbitMQ实战-死信队列

RabbitMQ 中的死信死信消息

RabbitMQ学习(中)——交换机死信队列和延迟队列

SpringBoot集成RabbitMQ之死信队列限流队列延迟队列(第四节)