使用RabbitMQ的死信队列实现延迟消息

Posted 刘元涛

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用RabbitMQ的死信队列实现延迟消息相关的知识,希望对你有一定的参考价值。

使用场景

  • 订单下单30分钟后,如果用户没有付款,则系统自动取消订单。
  • 会议开始前10分钟,推送消息提醒用户。
  • 自定义某个操作的执行时间,如果设置文章在明早9点发布。

除了上一篇《使用RabbitMQ插件实现延迟队列》外,使用死信队列也是一种方案.

  • Time To Live:可以在发送消息时设置过期时间,也可以设置整个队列的过期时间,如果两个同时设置已最早过期时间为准。
  • Dead Letter Exchanges:可以通过绑定队列的死信交换器来实现死信队列。
x-dead-letter-exchange:绑定死信交换器(其实也是普通交换器,与类型无关)
x-dead-letter-routing-key:绑定死信队列的路由键(可选)
x-message-ttl:绑定队列消息的过期时间(可选)

 死信队列设计思路:

生产者 --> 消息 --> 交换机 --> 队列 --> 变成死信 --> DLX交换机 -->队列 --> 消费者

进入消息队列:
1. 消息被拒绝,并且requeue= false
2. 消息ttl过期
3. 队列达到最大的长度

做延迟队列需要创建一个没有消费者的队列,用来存储消息。然后创建一个真正的消费队列,用来做具体的业务逻辑。当带有TTL的消息到达绑定死信交换器的队列,因为没有消费者所以会一直等到消息过期,然后消息被投递到死信队列也就是真正的消费队列。 具体代码:
package com.lyt.rabbitmq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.stereotype.Component;

import java.util.HashMap;
import java.util.Map;

/**
 * @Description 利用死信队列和过期时间模拟延迟队列,没有消费者,所以不能用注解形式
 * Time To Live(TTL)
 * 1. 可以在发送消息时设置过期时间(message.getMessageProperties().setExpiration("5000");)
 * 2. 也可以设置整个队列的过期时间(args.put("x-message-ttl",10000);)
 * 3. 如果两个同时设置已最早过期时间为准
 * Dead Letter Exchanges(DLX)
 * @Date 2019-03-10 10:25:30
 */
@Component
public class MQDelayConfig 

    /**
     * @Description 定义支付交换器
     * @Author lyt
     * @Date 2021-04-02 14:39:31
     */
    @Bean
    private DirectExchange directPayExchange() 
        return new DirectExchange("direct.pay.exchange");
    

    /**
     * @Description 定义支付队列 绑定死信队列(其实是绑定的交换器,然后通过交换器路由键绑定队列) 设置过期时间
     * @Author lyt
     * @Date 2021-04-02 14:40:24
     */
    @Bean
    private Queue directPayQueue() 
        Map<String, Object> args = new HashMap<>(3);
        //声明死信交换器
        args.put("x-dead-letter-exchange", "direct.delay.exchange");
        //声明死信路由键
        args.put("x-dead-letter-routing-key", "DelayKey");
        //声明队列消息过期时间
        args.put("x-message-ttl", 10000);
        return new Queue("direct.pay.queue", true, false, false, args);
    

    /**
     * @Description 定义支付绑定
     * @Author lyt
     * @Date 2021-04-02 14:46:10
     */
    @Bean
    private Binding bindingOrderDirect() 
        return BindingBuilder.bind(directPayQueue()).to(directPayExchange()).with("OrderPay");
    

带有过期时间且绑定死信交换器的队列

生产者,为消息设置过期时间setExpiration("15000");

/**
 * @Description 支付队列、绑定死信队列,测试消息延迟功能
 * @Author lyt
 * @Date 2021-04-02 14:07:25
 */
@RequestMapping(value = "/directDelayMQ", method = RequestMethod.GET)
public List<User> directDelayMQ() 
    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    List<User> users = userService.getUserList(null);
    for (User user : users) 
        CorrelationData correlationData = new CorrelationData(String.valueOf(user.getId()));
        rabbitTemplate.convertAndSend("direct.pay.exchange", "OrderPay", user,
                message -> 
                    // 设置5秒过期
                    message.getMessageProperties().setExpiration("15000");
                    return message;
                ,
                correlationData);
        System.out.println(user.getName() + ":" + sdf.format(new Date()));
    
    return users;

消费者,声明真正消费的队列、交换器、绑定

/**
 * @Description 延迟队列
 * @Author lyt
 * @Date 2021-04-04 16:34:28
 */
@RabbitListener(bindings = @QueueBinding(value = @Queue(value = "direct.delay.queue"), exchange = @Exchange(value = "direct.delay.exchange"), key = "DelayKey"))
public void getDLMessage(User user, Channel channel, Message message) throws InterruptedException, IOException 
    SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
    // 模拟执行任务
    System.out.println("这是延迟队列消费:" + user.getName() + ":" + sdf.format(new Date()));
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);

思考: 如果先放入一条A消息过期时间是10秒,再放入一个b消息过期时间是5秒,那延迟队列是否可以先消费b消息?

答案是否定的,因为队列就会遵循先进先出的规则,b消息会等a消息过期后,一起消费,这就是所谓的队列阻塞。由这个问题可以用我们之前介绍的插件方式解决。

以上是关于使用RabbitMQ的死信队列实现延迟消息的主要内容,如果未能解决你的问题,请参考以下文章

使用RabbitMQ的死信队列实现延迟消息

使用RabbitMQ的死信队列实现延迟消息

使用RabbitMQ的死信队列实现延迟消息

RabbitMQ实现延迟发送消息

MQ-死信队列实现消息延迟

消息队列 - 死信、延迟、重试队列