RabbitMQ——死信队列的三大来源应用举例

Posted 张起灵-小哥

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ——死信队列的三大来源应用举例相关的知识,希望对你有一定的参考价值。

1.什么是死信队列?

先从概念解释上搞清楚这个定义,死信,顾名思义就是无法被消费的消息,字面意思可以这样理解,一般来说,producer 将消息投递到 broker 或者直接到 queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。

应用场景: 为了保证订单业务的消息数据不丢失,需要使用到 RabbitMQ 的死信队列机制,当消息消费发生异常时,将消息投入死信队列中.还有比如说: 用户在商城下单成功并点击去支付后在指定时间未支付时自动失效。

三大来源:

  • 消息 TTL 过期
  • 队列达到最大长度(队列满了,无法再添加数据到 mq 中)
  • 消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false


2.三大来源之消息TTL过期

我们就参考上面的架构图来写代码。首先是生产者,其中还有一个工具类代码。

package com.szh.rabbitmq.utils;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

/**
 *
 */
public class RabbitMqUtils {
    public static Channel getChannel() {
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("192.168.40.130");
        factory.setPort(5672);
        factory.setUsername("root");
        factory.setPassword("root");
        Connection connection = null;
        Channel channel = null;
        try {
            connection = factory.newConnection();
            channel = connection.createChannel();
        } catch (IOException e) {
            e.printStackTrace();
        } catch (TimeoutException e) {
            e.printStackTrace();
        }
        return channel;
    }
}
package com.szh.rabbitmq.dead;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.szh.rabbitmq.utils.RabbitMqUtils;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

/**
 * 死信队列之生产者
 */
public class Producer {

    //普通交换机的名称
    public static final String NORMAL_EXCHANGE = "normal_exchange";

    public static void main(String[] args) throws IOException {
        Channel channel = RabbitMqUtils.getChannel();
        //设置接收消息的过期时间,超过这个时间则转到死信队列
        AMQP.BasicProperties properties = new AMQP.BasicProperties()
                .builder().expiration("10000").build();
        for (int i = 1; i <= 10; i++) {
            String message = "info" + i;
            channel.basicPublish(NORMAL_EXCHANGE,"zql",properties,message.getBytes(StandardCharsets.UTF_8));
        }
    }
}

下面是两个消费者。

package com.szh.rabbitmq.dead;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
import com.szh.rabbitmq.utils.RabbitMqUtils;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

/**
 * 死信队列之消费者01
 */
public class Consumer01 {

    //普通交换机的名称
    public static final String NORMAL_EXCHANGE = "normal_exchange";
    //死信交换机的名称
    public static final String DEAD_EXCHANGE = "dead_exchange";
    //普通队列的名称
    public static final String NORMAL_QUEUE = "normal_queue";
    //死信队列的名称
    public static final String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws IOException {
        Channel channel = RabbitMqUtils.getChannel();

        //绑定交换机、声明交换机的类型
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);

        //绑定普通队列, 正常队列绑定死信队列信息
        Map<String, Object> arguments = new HashMap<>();
        //正常队列设置死信交换机, 参数 key 是固定值
        arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
        //正常队列设置死信 routing-key, 参数 key 是固定值
        arguments.put("x-dead-letter-routing-key","szh");
        //正常队列最大长度限制
        //arguments.put("x-max-length",6);
        channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);
        //绑定死信队列
        channel.queueDeclare(DEAD_QUEUE,false,false,false,null);

        //普通交换机与普通队列进行绑定
        channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zql");
        //死信交换机与死信队列进行绑定
        channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"szh");
        System.out.println("等待接收消息.....");

        DeliverCallback deliverCallback = (consumerTag,message) -> {
//            String msg = new String(message.getBody());
//            if ("info5".equals(msg)) {
//                System.out.println(msg + "此消息已被Consumer01拒绝....");
//                //执行拒绝策略,被拒绝的消息将转到死信队列中
//                channel.basicReject(message.getEnvelope().getDeliveryTag(),false);
//            } else {
//                System.out.println("Consumer01接收的消息是:" + msg);
//                channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
//            }
            String msg = new String(message.getBody());
            System.out.println("Consumer01接收的消息是:" + msg);
        };
        //修改autoAck为false,表示不自动应答
        channel.basicConsume(NORMAL_QUEUE,false,deliverCallback,consumerTag -> {});
    }
}
package com.szh.rabbitmq.dead;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.szh.rabbitmq.utils.RabbitMqUtils;

import java.io.IOException;

/**
 *
 */
public class Consumer02 {

    //死信队列的名称
    public static final String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws IOException {
        Channel channel = RabbitMqUtils.getChannel();

        System.out.println("等待接收消息.....");

        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println("Consumer02接收的消息是:" + new String(message.getBody()));
        };
        channel.basicConsume(DEAD_QUEUE,true,deliverCallback,consumerTag -> {});
    }
}

下面我们测试一下,因为有关队列的声明都写在了第一个消费者中,所以先启动第一个消费者,然后模拟消息TTL过期(直接将消费者1down掉)。

然后再启动生产者,此时会向MQ中发送10条消息,这些消息此时会存在normal_queue中。由于消费者1已经down掉,它自然接收不到消息,那么等消息过期之后(在生产者代码中设定的是10s),这些消息会被转到死信队列dead_queue中,此时再启动消费者2,它就可以从死信队列中接收到这10条消息。


3.三大来源之队列达到最大长度

工具类和上面的案例是一样的,其余的生产者和消费者稍有变动。

package com.szh.rabbitmq.dead;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.szh.rabbitmq.utils.RabbitMqUtils;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

/**
 * 死信队列之生产者
 */
public class Producer {

    //普通交换机的名称
    public static final String NORMAL_EXCHANGE = "normal_exchange";

    public static void main(String[] args) throws IOException {
        Channel channel = RabbitMqUtils.getChannel();
        //设置接收消息的过期时间,超过这个时间则转到死信队列
//        AMQP.BasicProperties properties = new AMQP.BasicProperties()
//                .builder().expiration("10000").build();
        for (int i = 1; i <= 10; i++) {
            String message = "info" + i;
            channel.basicPublish(NORMAL_EXCHANGE,"zql",null,message.getBytes(StandardCharsets.UTF_8));
        }
    }
}
package com.szh.rabbitmq.dead;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
import com.szh.rabbitmq.utils.RabbitMqUtils;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

/**
 * 死信队列之消费者01
 */
public class Consumer01 {

    //普通交换机的名称
    public static final String NORMAL_EXCHANGE = "normal_exchange";
    //死信交换机的名称
    public static final String DEAD_EXCHANGE = "dead_exchange";
    //普通队列的名称
    public static final String NORMAL_QUEUE = "normal_queue";
    //死信队列的名称
    public static final String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws IOException {
        Channel channel = RabbitMqUtils.getChannel();

        //绑定交换机、声明交换机的类型
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);

        //绑定普通队列, 正常队列绑定死信队列信息
        Map<String, Object> arguments = new HashMap<>();
        //正常队列设置死信交换机, 参数 key 是固定值
        arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
        //正常队列设置死信 routing-key, 参数 key 是固定值
        arguments.put("x-dead-letter-routing-key","szh");
        //正常队列最大长度限制
        arguments.put("x-max-length",6);
        channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);
        //绑定死信队列
        channel.queueDeclare(DEAD_QUEUE,false,false,false,null);

        //普通交换机与普通队列进行绑定
        channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zql");
        //死信交换机与死信队列进行绑定
        channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"szh");
        System.out.println("等待接收消息.....");

        DeliverCallback deliverCallback = (consumerTag,message) -> {
//            String msg = new String(message.getBody());
//            if ("info5".equals(msg)) {
//                System.out.println(msg + "此消息已被Consumer01拒绝....");
//                //执行拒绝策略,被拒绝的消息将转到死信队列中
//                channel.basicReject(message.getEnvelope().getDeliveryTag(),false);
//            } else {
//                System.out.println("Consumer01接收的消息是:" + msg);
//                channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
//            }
            String msg = new String(message.getBody());
            System.out.println("Consumer01接收的消息是:" + msg);
        };
        //修改autoAck为false,表示不自动应答
        channel.basicConsume(NORMAL_QUEUE,false,deliverCallback,consumerTag -> {});
    }
}
package com.szh.rabbitmq.dead;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.szh.rabbitmq.utils.RabbitMqUtils;

import java.io.IOException;

/**
 *
 */
public class Consumer02 {

    //死信队列的名称
    public static final String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws IOException {
        Channel channel = RabbitMqUtils.getChannel();

        System.out.println("等待接收消息.....");

        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println("Consumer02接收的消息是:" + new String(message.getBody()));
        };
        channel.basicConsume(DEAD_QUEUE,true,deliverCallback,consumerTag -> {});
    }
}

下面我们测试一下,还是先启动消费者1确保MQ中已经有了相应的交换机和队列,然后将消费者1先停掉,去启动生产者,先向MQ中发送10条消息,看看结果。

此时由于消费者1被停掉了,它就无法接收消息,而它所承受的队列最大长度为6,所以这6个会堆积在normal_queue队列中,剩下的 10-6=4 条消息会转到死信队列中。

当我们启动消费者1、2之后,可以看到它们能够接收到相应队列中的消息。


4.三大来源之消息被拒绝

package com.szh.rabbitmq.dead;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.szh.rabbitmq.utils.RabbitMqUtils;

import java.io.IOException;
import java.nio.charset.StandardCharsets;

/**
 * 死信队列之生产者
 */
public class Producer {

    //普通交换机的名称
    public static final String NORMAL_EXCHANGE = "normal_exchange";

    public static void main(String[] args) throws IOException {
        Channel channel = RabbitMqUtils.getChannel();
        //设置接收消息的过期时间,超过这个时间则转到死信队列
//        AMQP.BasicProperties properties = new AMQP.BasicProperties()
//                .builder().expiration("10000").build();
        for (int i = 1; i <= 10; i++) {
            String message = "info" + i;
            channel.basicPublish(NORMAL_EXCHANGE,"zql",null,message.getBytes(StandardCharsets.UTF_8));
        }
    }
}
package com.szh.rabbitmq.dead;

import com.rabbitmq.client.BuiltinExchangeType;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.rabbitmq.client.Delivery;
import com.szh.rabbitmq.utils.RabbitMqUtils;

import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

/**
 * 死信队列之消费者01
 */
public class Consumer01 {

    //普通交换机的名称
    public static final String NORMAL_EXCHANGE = "normal_exchange";
    //死信交换机的名称
    public static final String DEAD_EXCHANGE = "dead_exchange";
    //普通队列的名称
    public static final String NORMAL_QUEUE = "normal_queue";
    //死信队列的名称
    public static final String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws IOException {
        Channel channel = RabbitMqUtils.getChannel();

        //绑定交换机、声明交换机的类型
        channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
        channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);

        //绑定普通队列, 正常队列绑定死信队列信息
        Map<String, Object> arguments = new HashMap<>();
        //正常队列设置死信交换机, 参数 key 是固定值
        arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
        //正常队列设置死信 routing-key, 参数 key 是固定值
        arguments.put("x-dead-letter-routing-key","szh");
        //正常队列最大长度限制
        //arguments.put("x-max-length",6);
        channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);
        //绑定死信队列
        channel.queueDeclare(DEAD_QUEUE,false,false,false,null);

        //普通交换机与普通队列进行绑定
        channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zql");
        //死信交换机与死信队列进行绑定
        channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"szh");
        System.out.println("等待接收消息.....");

        DeliverCallback deliverCallback = (consumerTag,message) -> {
            String msg = new String(message.getBody());
            if ("info5".equals(msg)) {
                System.out.println(msg + "此消息已被Consumer01拒绝....");
                //执行拒绝策略,被拒绝的消息将转到死信队列中
                channel.basicReject(message.getEnvelope().getDeliveryTag(),false);
            } else {
                System.out.println("Consumer01接收的消息是:" + msg);
                channel.basicAck(message.getEnvelope().getDeliveryTag(),false);
            }
//            String msg = new String(message.getBody());
//            System.out.println("Consumer01接收的消息是:" + msg);
        };
        //修改autoAck为false,表示不自动应答
        channel.basicConsume(NORMAL_QUEUE,false,deliverCallback,consumerTag -> {});
    }
}
package com.szh.rabbitmq.dead;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.DeliverCallback;
import com.szh.rabbitmq.utils.RabbitMqUtils;

import java.io.IOException;

/**
 *
 */
public class Consumer02 {

    //死信队列的名称
    public static final String DEAD_QUEUE = "dead_queue";

    public static void main(String[] args) throws IOException {
        Channel channel = RabbitMqUtils.getChannel();

        System.out.println("等待接收消息.....");

        DeliverCallback deliverCallback = (consumerTag, message) -> {
            System.out.println("Consumer02接收的消息是:" + new String(message.getBody()));
        };
        channel.basicConsume(DEAD_QUEUE,true,deliverCallback,consumerTag -> {});
    }
}

下面我们测试一下,还是先启动消费者1确保MQ中已经有了相应的交换机和队列,然后将消费者1先停掉,去启动生产者,先向MQ中发送10条消息,看看结果。

生产者消息发送完毕之后,因为消费者1被down掉了,所以这10条消息被堆积到了normal_queue队列中。

此时我们再启动消费者1,可以看到它正常的去MQ中消费,但是其中的info5被拒绝了,而这个拒绝的消息就会转到死信队列中。

在死信队列中就看到了info5这条消息,此时再启动消费者2,它就可以顺利的去死信队列中消费了。

以上是关于RabbitMQ——死信队列的三大来源应用举例的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ中的死信队列

利用rabbitMq的死信队列实现延时消息

MQ 死信队列/延迟队列-( 商品秒杀后30分钟之内付款)

MQ 死信队列/延迟队列-( 商品秒杀后30分钟之内付款)

SpringBoot整合RabbitMQ实现死信队列

SpringBoot+RabbitMQ 死信队列