RabbitMQ-死信队列
Posted 插肩而过放个p
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ-死信队列相关的知识,希望对你有一定的参考价值。
死信队列
死信的概念
顾名思义就是无法被消费的消息,一般来说,producer 将消息投递到 broker 或者直接到 queue 里了,consumer 从 queue 取出消息进行消费,但某些时候由于特定的原因导致 queue 中的某些消息无法被消费,这样的消息如果没有后续的处理,就变成了死信,有死信自然就有了死信队列。
死信消息来源
- 消息 TTL 过期
- 队列达到最大长度(队列满了,无法再添加数据到 mq 中)
- 消息被拒绝(basic.reject 或 basic.nack)并且 requeue=false.
死信实战
代码架构图
消息TTL过期
生产者代码
public class Producer
private static final String NORMAL_EXCHANGE = "normal_exchange";
public static void main(String[] args)throws Exception
try(Channel channel = RabbitMqUtils.getChannel())
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
AMQP.BasicProperties properties = new AMQP.BasicProperties().builder().expiration("10000").build();
for (int i = 1;i < 11;i ++)
String message = "info" + i;
channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,message.getBytes());
System.out.println("生产者发送信息");
消费者C1代码(启动后关闭模拟器接收不到消息)
public class Consumer01
private static final String NORMAL_EXCHANGE = "normal_exchange";
private static final String DEAD_EXCHANGE = "dead_exchange";
public static void main(String[] args) throws Exception
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
String deadQueue = "dead-queue";
channel.queueDeclare(deadQueue,false,false,false,null);
channel.queueBind(deadQueue,DEAD_EXCHANGE,"lisi");
Map<String,Object> params = new HashMap<>();
params.put("x-dead-letter-exchange",DEAD_EXCHANGE);
params.put("x-dead-letter-routing-key","lisi");
String normalQueue = "normal-queue";
channel.queueDeclare(normalQueue,false,false,false,params);
channel.queueBind(normalQueue,NORMAL_EXCHANGE,"zhangsan");
System.out.println("wait...");
DeliverCallback deliverCallback = (consumerTag,delivery) ->
String message = new String(delivery.getBody(),"UTF-8");
System.out.println("Consumer01接收到消息" + message);
;
channel.basicConsume(normalQueue,true,deliverCallback,consumerTag -> );
消费者C2代码
public class Consumer02
private static final String DEAD_EXCHANGE = "dead_exchange";
public static void main(String[] argv) throws Exception
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
String deadQueue = "dead-queue";
channel.queueDeclare(deadQueue, false, false, false, null);
channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
System.out.println("等待接收死信队列消息.....");
DeliverCallback deliverCallback = (consumerTag, delivery) ->
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("Consumer02 接收死信队列的消息" + message);
;
channel.basicConsume(deadQueue, true, deliverCallback, consumerTag ->
);
结果
此时normal-queue中有10条消息
normal-queue中消息未被消耗转入dead-queue(死信队列)
运行Consumer02接收死信队列信息
结果
队列达到最大长度
生产者
public class Producer
private static final String NORMAL_EXCHANGE = "normal_exchange";
public static void main(String[] argv) throws Exception
try (Channel channel = RabbitMqUtils.getChannel())
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
//该信息是用作演示队列个数限制
for (int i = 1; i <11 ; i++)
String message="info"+i;
channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",null, message.getBytes());
System.out.println("生产者发送消息:"+message);
C1消费者
public class Consumer01
private static final String NORMAL_EXCHANGE = "normal_exchange";
private static final String DEAD_EXCHANGE = "dead_exchange";
public static void main(String[] args) throws Exception
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
String deadQueue = "dead-queue";
channel.queueDeclare(deadQueue,false,false,false,null);
channel.queueBind(deadQueue,DEAD_EXCHANGE,"lisi");
Map<String,Object> params = new HashMap<>();
params.put("x-dead-letter-exchange",DEAD_EXCHANGE);
params.put("x-dead-letter-routing-key","lisi");
params.put("x-max-length",6);
String normalQueue = "normal-queue";
channel.queueDeclare(normalQueue,false,false,false,params);
channel.queueBind(normalQueue,NORMAL_EXCHANGE,"zhangsan");
System.out.println("wait...");
DeliverCallback deliverCallback = (consumerTag,delivery) ->
String message = new String(delivery.getBody(),"UTF-8");
System.out.println("Consumer01接收到消息" + message);
;
channel.basicConsume(normalQueue,true,deliverCallback,consumerTag -> );
消息被拒
生产者
和上一个代码一样
C1消费者
public class Consumer02
private static final String DEAD_EXCHANGE = "dead_exchange";
public static void main(String[] argv) throws Exception
Channel channel = RabbitMqUtils.getChannel();
channel.exchangeDeclare(DEAD_EXCHANGE, BuiltinExchangeType.DIRECT);
String deadQueue = "dead-queue";
channel.queueDeclare(deadQueue, false, false, false, null);
channel.queueBind(deadQueue, DEAD_EXCHANGE, "lisi");
System.out.println("等待接收死信队列消息.....");
DeliverCallback deliverCallback = (consumerTag, delivery) ->
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("Consumer02 接收死信队列的消息" + message);
;
channel.basicConsume(deadQueue, true, deliverCallback, consumerTag ->
);
C2消费者
//和上一个代码一样
以上是关于RabbitMQ-死信队列的主要内容,如果未能解决你的问题,请参考以下文章