RabbitMQ中的死信队列
Posted 杀手不太冷!
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ中的死信队列相关的知识,希望对你有一定的参考价值。
死信队列
概念
当queue消息队列中的消息由于一些原因没办法被消费,如果这些消息一直没办法被处理,就会从这个正常的消息队列转移到死信消息队列中。
应用场景:当用户下单之后,如果一直不支付,那么用户下单的这条消息就会被存放到死信队列中去。
死信的来源
1.当queue消息队列满的时候,再进来的消息就会被放到死信队列中去。
2.在消息应答的时候,如果消费者一直没有告诉RabbitMQ有没有成功处理消息,那么RabbitMQ消息队列就不清楚自己到底要不要删除这条消息,这个时候消息队列中的消息一直没办法处理,这样这条消息也会被放到死信队列中去。
3.消息TTL过期,什么意思?TTL的全拼是Time To Live意思是指存活时间,就是消息队列中存放的消息一般都是有一定时间的,超过了这个时间,这条消息就会被放到死信队列中去。
用一个例子来演示死信队列
我们现在有一个思路,现在有一个发送者,这个发送者会往正常的队列中发送消息,但是在发送消息的时候会给这些消息设置一个TTL(Time to Live)也即是消息的存活过期时间为10s,超过了这个时间,那么正常队列中的消息就会被转移到死信队列中去;然后我们让发送者往正常队列中发送10条消息,如果在10s内没有消费者去消费正常队列中的消息,那么这些消息就会被转移到死信队列中;
最后因为死信队列也是一个消息队列,所以我们也可以写一个消费者,让消费者消费死信队列中的消息。
上面的思路演示如下:
写消费者1
首先写一个消费者1也即是Consumer1,在消费者1中我们要声明正常队列,死信队列,正常队列的交换机,死信队列的交换机,还要指定正常队列里面的消息出现TTL的时候,这些消息会被存放到哪个死信队列中,消费者1如下图:
/**
* @Date 2021/11/12 15:00
* @Author 望轩
*
* 死信队列
* 消费者1
*/
public class Consumer01 {
//普通交换机的名称
public static final String NORMAL_EXCHANGE="normal_exchange";
//死信交换机的名称
public static final String DEAD_EXCHANGE="dead_exchange";
//普通队列的名称
public static final String NORMAL_QUEUE="normal_queue";
//死信队列的名称
public static final String DEAD_QUEUE="dead_queue";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMQUtil.getChannel();
//声明死信和普通交换机类型为direct类型
channel.exchangeDeclare(NORMAL_EXCHANGE, BuiltinExchangeType.DIRECT);
channel.exchangeDeclare(DEAD_EXCHANGE,BuiltinExchangeType.DIRECT);
//声明普通队列
Map<String,Object> arguments=new HashMap<>();
arguments.put("x-dead-letter-exchange",DEAD_EXCHANGE);
arguments.put("x-dead-letter-routing-key","lisi");
channel.queueDeclare(NORMAL_QUEUE,false,false,false,arguments);
//声明死信队列
channel.queueDeclare(DEAD_QUEUE,false,false,false,null);
//绑定普通的交换机与普通的队列
channel.queueBind(NORMAL_QUEUE,NORMAL_EXCHANGE,"zhangsan");
//绑定死信的交换机与死信的队列
channel.queueBind(DEAD_QUEUE,DEAD_EXCHANGE,"lisi");
//如果能成功接收到消息会调用的回调函数
DeliverCallback deliverCallback=(consumerTag, message)->{
System.out.println("Consumer01接收者接收到的消息:"+new String(message.getBody()));
};
//如果取消从消息队列中获取消息时会调用的回调函数
CancelCallback cancelCallback= consumerTag->{
System.out.println("消息消费被中断");
};
channel.basicConsume(NORMAL_QUEUE,true,deliverCallback,cancelCallback);
}
}
写生产者
我们本次的生产者主要是用来往正常队列中发送消息的,它不用管死信队列是什么样的,死信队列是什么样的那是消费者1关心的事情,生产者如下图:
/**
* @Date 2021/11/12 15:22
* @Author 望轩
*
* 死信队列之生产者代码
*/
public class Producer01 {
//普通交换机的名称
public static final String NORMAL_EXCHANGE="normal_exchange";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMQUtil.getChannel();
//死信消息,设置TTL时间,Time to live,消息存活时间,单位是ms
AMQP.BasicProperties properties=new AMQP.BasicProperties()
.builder().expiration("1000").build();
for(int i=1;i<11;i++){
String message="info"+i;
channel.basicPublish(NORMAL_EXCHANGE,"zhangsan",properties,message.getBytes());
}
}
}
写消费者2
消费者2主要是用来消费死信队列中的消息的,如下图:
/**
* @Date 2021/11/12 15:36
* @Author 望轩
*/
public class Consumer02 {
//死信队列的名称
public static final String DEAD_QUEUE="dead_queue";
public static void main(String[] args) throws IOException, TimeoutException {
Channel channel = RabbitMQUtil.getChannel();
//如果能成功接收到消息会调用的回调函数
DeliverCallback deliverCallback=(consumerTag, message)->{
System.out.println("Consumer02接收者接收死信队列中的东西:"+new String(message.getBody()));
};
//如果取消从消息队列中获取消息时会调用的回调函数
CancelCallback cancelCallback= consumerTag->{
System.out.println("消息消费被中断");
};
channel.basicConsume(DEAD_QUEUE,true,deliverCallback,cancelCallback);
}
}
测试
因为我们是在Consumer1消费者1里面声明的队列和交换机,所以第一步我们一定要先启动Consumer1消费者,然后会在RabbitMQ中生成对应的队列和交换机。
启动完Consumer1消费者之后,我们的队列和交换机都已经生成了,所以我们就可以用发送者往正常队列里面发送消息了,我们启动生产者,但是注意这个时候要关闭消费者1,不要让消费者1消费正常队列中的消息,这样的话,在10s之后,由于正常队列中的消息一直没有被消费,所以它会把正常队列中的信息都转移到死信队列中,如下图:
接着我们启动完消费者2服务之后,由于消费者2服务会消费死信队列中的消息,所以会发生如下情况,如下图:
以上是关于RabbitMQ中的死信队列的主要内容,如果未能解决你的问题,请参考以下文章