RabbitMQ 死信队列
Posted JAVA炭烧
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ 死信队列相关的知识,希望对你有一定的参考价值。
日积月累,水滴石穿 😄
什么是死信队列
DLX ,全称为 Dead-Letter-Exchange
,可以称之为死信交换机。当消息在一个队列中变成死信( dead message )之后,它能被重新被发送到另一个交换器中,这个交换器就是 DLX ,绑定 DLX 的队列就称之为死信队列。
消息变成死信一般是由于以下几种情况造成的:
-
1、消息过期,也就是笔者在上篇提到的 TTL。消息在队列的存活时间超过所设置的 TTL 时间。
-
2、消息被拒绝,调用了
channel.basicNack
或channel.basicReject
方法,井且设置requeue
参数为false
。 -
3、队列的消息达到最大长度。
DLX 也是一个正常的交换器,和一般的交换器没有区别,它能在任何的队列上被指定,实际上就是设置某个队列的属性。当这个队列中存在死信时 RabbitMQ
就会自动地将这个消息新发布到设置的 DLX 上去,进而被路由到另一个队列,即死信队列。可以监听这个队列中的消息、以进行相应的处理。
配置死信队列
在 channel.queueDeclare
方法中设置 x-dead-letter-exchange
参数来为这个队列添加 DLX。下面就第一种和第二种情况来介绍一下死信队列的配置与使用。
消息过期
添加配置
mq:
queueBinding:
queue: prod_queue_pay
dlQueue: dl-queue
exchange:
name: exchang_prod_pay
dlTopicExchange: dl-topic-exchange
key: prod_pay
dlRoutingKey: dl-routing-key
创建死信交换机、死信队列以及两者的绑定
@Value("${mq.queueBinding.exchange.dlTopicExchange}")
private String dlTopicExchange;
@Value("${mq.queueBinding.dlRoutingKey}")
private String dlRoutingKey;
@Value("${mq.queueBinding.dlQueue}")
private String dlQueue;
//创建死信交换机
@Bean
public TopicExchange dlTopicExchange(){
return new TopicExchange(dlTopicExchange,true,false);
}
//创建死信队列
@Bean
public Queue dlQueue(){
return new Queue(dlQueue,true);
}
//死信队列与死信交换机进行绑定
@Bean
public Binding BindingErrorQueueAndExchange(Queue dlQueue, TopicExchange dlTopicExchange){
return BindingBuilder.bind(dlQueue).to(dlTopicExchange).with(dlRoutingKey);
}
可以看到上述代码没什么特殊的,就普普通通的创建了一个交换机、队列,并将两者绑定在一起。
创建业务队列、业务交换机,以及两者的绑定
@Value("${mq.queueBinding.queue}")
private String queueName;
@Value("${mq.queueBinding.exchange.name}")
private String exchangeName;
@Value("${mq.queueBinding.key}")
private String key;
private final String dle = "x-dead-letter-exchange";
private final String dlk = "x-dead-letter-routing-key";
private final String ttl = "x-message-ttl";
/**
* 业务队列
* @return
*/
@Bean
public Queue payQueue(){
Map<String,Object> params = new HashMap<>();
//设置队列的过期时间
//队列中所有消息都有相同的过期时间
params.put(ttl,10000);
//声明当前队列绑定的死信交换机
params.put(dle,dlTopicExchange);
//声明当前队列的死信路由键 如果没有指定,则使用原队列的路由键:
params.put(dlk,dlRoutingKey);
return QueueBuilder.durable(queueName).withArguments(params).build();
}
@Bean
public TopicExchange payTopicExchange(){
return new TopicExchange(exchangeName,true,false);
}
//队列与交换机进行绑定
@Bean
public Binding BindingPayQueueAndPayTopicExchange(Queue payQueue, TopicExchange payTopicExchange){
return BindingBuilder.bind(payQueue).to(payTopicExchange).with(key);
}
上述重点代码为 业务队列
的创建。指定了队列的过期时间,并配置了死信队列。
生产者代码
/*
* 生产者
*/
@Component
@Slf4j
public class RabbitSender {
@Value("${mq.queueBinding.exchange.name}")
private String exchangeName;
@Value("${mq.queueBinding.key}")
private String key;
@Autowired
private RabbitTemplate rabbitTemplate;
public void send(String msg){
log.info("RabbitSender.send() msg = {}",msg);
// 将消息发送给业务交换机
rabbitTemplate.convertAndSend(exchangeName,key,msg);
}
}
提供对外方法
@Autowired
private RabbitSender rabbitSender;
@GetMapping
public void test(@RequestParam String msg){
rabbitSender.send(msg);
}
启动服务,可以看到同时创建了业务队列、业务交换机以及死信队列、死信交换机。而且可以看到业务队列上带了 DLX、DLK标签。 然后调用接口:http://localhost:8080/?msg=红红火火 ,消息会被发送到 prod_queue_pay
这个队列。
如果 10s 内没有消费者消费这条消息,那么判定这条消息为过期消息。由于设置了 DLX ,过期时消息被丢给 dlxExchange
交换机中,根据所配置的dlRoutingKey
找到与 dlxExchange
匹配的队列 dlQueue
后,消息被存储在 dlxQueue
这个死信队列中。
消息被拒绝
将业务队列的过期时间去掉。
/**
* 业务队列
* @return
*/
@Bean
public Queue payQueue(){
Map<String,Object> params = new HashMap<>();
//声明当前队列绑定的死信交换机
params.put(dle,dlTopicExchange);
//声明当前队列的死信路由键 如果没有指定,则使用原队列的路由键:
params.put(dlk,dlRoutingKey);
return QueueBuilder.durable(queueName).withArguments(params).build();
}
配置文件新增配置
rabbitmq:
listener:
simple:
#消息确认方式 manual 手动确认 auto 自动确认 none 不管
acknowledge-mode: manual
添加消费者
/**
* 消费者
*/
@Component
@Slf4j
public class RabbitReceiver {
//测试消费者进行消费发送异常 是否进入死信队列
@RabbitListener(queues = "${mq.queueBinding.queue}")
public void infoConsumption(String data, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
log.info("收到信息:{}",data);
boolean ack = false;
Exception exception = null;
try {
if(data.contains("888")){
throw new RuntimeException("信息敏感");
}
} catch (RuntimeException e) {
ack = true;
exception = e;
}
if (ack){
log.error("消息消费发生异常,error msg:{}", exception.getMessage(), exception);
//注意第三个参数需要为false
//true则重新放入原队列,否则丢弃或者进入死信队列。
channel.basicNack(tag, false, false);
} else {
//进行手动确认信息已经被消费
channel.basicAck(tag, false);
}
}
}
在上述代码中,channel.basicNack
方法如果将第三个参数设置为true
,将会出现无限重试问题。在下一篇中笔者将介绍怎么解决无限重试问题。
将之前的队列进行删除,然后进行重启,重新生成业务队列。可以看到业务队列
的标签少了 TTL。
请求接口:http://localhost:8080/?msg=888 因为带有敏感消息,业务就会发生异常,将该消息转发至死信队列中。
- 如你对本文有疑问或本文有错误之处,欢迎评论留言指出。如觉得本文对你有所帮助,欢迎点赞和关注。
点击链接加入群聊【Java进阶群】:领取《阿里巴巴内部Spring Cloud学习笔记!(超4W字,含阿里、字节跳动面试真题)》
最后,希望未来的我发展顺利,早日拿下p7!同样,也祝愿你实现自己的人生理想,愿我们都越来越好,共勉!
获取方式: 只需你**点赞+关注**后,Java进阶交流群:714827309 哦-!
获取方式: 只需你点赞+关注后,Java进阶交流群:714827309 进群拿资料哦-!
以上是关于RabbitMQ 死信队列的主要内容,如果未能解决你的问题,请参考以下文章