使用RabbitMQ的死信队列实现延迟消息
Posted 刘元涛
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用RabbitMQ的死信队列实现延迟消息相关的知识,希望对你有一定的参考价值。
使用场景
- 订单下单30分钟后,如果用户没有付款,则系统自动取消订单。
- 会议开始前10分钟,推送消息提醒用户。
- 自定义某个操作的执行时间,如果设置文章在明早9点发布。
除了上一篇《使用RabbitMQ插件实现延迟队列》外,使用死信队列也是一种方案.
- Time To Live:可以在发送消息时设置过期时间,也可以设置整个队列的过期时间,如果两个同时设置已最早过期时间为准。
- Dead Letter Exchanges:可以通过绑定队列的死信交换器来实现死信队列。
x-dead-letter-exchange:绑定死信交换器(其实也是普通交换器,与类型无关)
x-dead-letter-routing-key:绑定死信队列的路由键(可选)
x-message-ttl:绑定队列消息的过期时间(可选)
死信队列设计思路:
生产者 --> 消息 --> 交换机 --> 队列 --> 变成死信 --> DLX交换机 -->队列 --> 消费者
进入消息队列:
1. 消息被拒绝,并且requeue= false
2. 消息ttl过期
3. 队列达到最大的长度
做延迟队列需要创建一个没有消费者的队列,用来存储消息。然后创建一个真正的消费队列,用来做具体的业务逻辑。当带有TTL的消息到达绑定死信交换器的队列,因为没有消费者所以会一直等到消息过期,然后消息被投递到死信队列也就是真正的消费队列。
具体代码:
package com.lyt.rabbitmq.config;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;
import java.util.HashMap;
import java.util.Map;
/**
* @Description 利用死信队列和过期时间模拟延迟队列,没有消费者,所以不能用注解形式
* Time To Live(TTL)
* 1. 可以在发送消息时设置过期时间(message.getMessageProperties().setExpiration("5000");)
* 2. 也可以设置整个队列的过期时间(args.put("x-message-ttl",10000);)
* 3. 如果两个同时设置已最早过期时间为准
* Dead Letter Exchanges(DLX)
* @Date 2019-03-10 10:25:30
*/
@Component
public class MQDelayConfig
/**
* @Description 定义支付交换器
* @Author lyt
* @Date 2021-04-02 14:39:31
*/
@Bean
private DirectExchange directPayExchange()
return new DirectExchange("direct.pay.exchange");
/**
* @Description 定义支付队列 绑定死信队列(其实是绑定的交换器,然后通过交换器路由键绑定队列) 设置过期时间
* @Author lyt
* @Date 2021-04-02 14:40:24
*/
@Bean
private Queue directPayQueue()
Map<String, Object> args = new HashMap<>(3);
//声明死信交换器
args.put("x-dead-letter-exchange", "direct.delay.exchange");
//声明死信路由键
args.put("x-dead-letter-routing-key", "DelayKey");
//声明队列消息过期时间
args.put("x-message-ttl", 10000);
return new Queue("direct.pay.queue", true, false, false, args);
/**
* @Description 定义支付绑定
* @Author lyt
* @Date 2021-04-02 14:46:10
*/
@Bean
private Binding bindingOrderDirect()
return BindingBuilder.bind(directPayQueue()).to(directPayExchange()).with("OrderPay");
带有过期时间且绑定死信交换器的队列
生产者,为消息设置过期时间setExpiration("15000");
/**
* @Description 支付队列、绑定死信队列,测试消息延迟功能
* @Author lyt
* @Date 2021-04-02 14:07:25
*/
@RequestMapping(value = "/directDelayMQ", method = RequestMethod.GET)
public List<User> directDelayMQ()
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
List<User> users = userService.getUserList(null);
for (User user : users)
CorrelationData correlationData = new CorrelationData(String.valueOf(user.getId()));
rabbitTemplate.convertAndSend("direct.pay.exchange", "OrderPay", user,
message ->
// 设置5秒过期
message.getMessageProperties().setExpiration("15000");
return message;
,
correlationData);
System.out.println(user.getName() + ":" + sdf.format(new Date()));
return users;
消费者,声明真正消费的队列、交换器、绑定
/**
* @Description 延迟队列
* @Author lyt
* @Date 2021-04-04 16:34:28
*/
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "direct.delay.queue"), exchange = @Exchange(value = "direct.delay.exchange"), key = "DelayKey"))
public void getDLMessage(User user, Channel channel, Message message) throws InterruptedException, IOException
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
// 模拟执行任务
System.out.println("这是延迟队列消费:" + user.getName() + ":" + sdf.format(new Date()));
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
思考: 如果先放入一条A消息过期时间是10秒,再放入一个b消息过期时间是5秒,那延迟队列是否可以先消费b消息?
答案是否定的,因为队列就会遵循先进先出的规则,b消息会等a消息过期后,一起消费,这就是所谓的队列阻塞。由这个问题可以用我们之前介绍的插件方式解决。
以上是关于使用RabbitMQ的死信队列实现延迟消息的主要内容,如果未能解决你的问题,请参考以下文章