SpringBoot RabbitMQ 延迟队列代码实现

Posted therhyme

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringBoot RabbitMQ 延迟队列代码实现相关的知识,希望对你有一定的参考价值。

场景

用户下单后,如果30min未支付,则删除该订单,这时候就要可以用延迟队列

 

准备

利用rabbitmq_delayed_message_exchange插件;

首先下载该插件:https://www.rabbitmq.com/community-plugins.html

技术图片

然后把该插件放到rabbitmq安装目录plugins下;

进入到sbin目录下,执行"rabbitmq-plugins.bat enable rabbitmq_delayed_message_exchange";

技术图片

 

关闭RabbitMQ服务,然后再启动(直接重启该插件可能会不生效)。

 

SpringBoot RabbitMQ代码

application.properties配置文件

spring.application.name=spring-boot-rabbitmq
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=nut
spring.rabbitmq.password=nut

 

配置类

注意这里的"x-delayed-type"和"x-delayed-message"

/**
 * 延迟队列配置exchange
 */
@Configuration
public class DelayQueueConfig 

    public static final String DELAY_EXCHANGE = "DELAY_EXCHANGE";
    public static final String DELAY_QUEUE = "DELAY_QUEUE";
    public static final String DELAY_ROUTING_KEY = "DELAY_ROUTING_KEY";

    @Bean("delayExchange")
    public Exchange delayExchange() 
        Map<String, Object> args = new HashMap<>(1);
//       x-delayed-type    声明 延迟队列Exchange的类型
        args.put("x-delayed-type", "direct");
        return new CustomExchange(DELAY_EXCHANGE, "x-delayed-message",true, false,args);
    

    @Bean("delayQueue")
    public Queue delayQueue() 
        return QueueBuilder.durable(DELAY_QUEUE).build();
    

    /**
     * 将延迟队列通过routingKey绑定到延迟交换器
     *
     * @return
     */
    @Bean
    public Binding delayQueueBindExchange() 
        return new Binding(DELAY_QUEUE, Binding.DestinationType.QUEUE, DELAY_EXCHANGE, DELAY_ROUTING_KEY, null);
    

 

生产者

发送消息时,指定延迟的毫秒

/**
 * 延迟队列发送者
 */
@Component
@Slf4j
public class DelayQueueSender 

    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendDelayQueue(int number) 
        log.warn("延迟队列发送 :  milliseconds", number);
        // 这里的Exchange可以是业务的Exchange,为了方便测试这里直接往死信Exchange里投递消息
        rabbitTemplate.convertAndSend(
                DelayQueueConfig.DELAY_EXCHANGE,
                DelayQueueConfig.DELAY_ROUTING_KEY,
                number, (message) -> 
                    // 设置延迟的毫秒数
                    message.getMessageProperties().setDelay(number);
                    log.info("Now : ", ZonedDateTime.now());
                    return message;
                );
    

 

消费者

/**
 * 延迟队列消费者
 */
@Component
@Slf4j
@RabbitListener(queues = DelayQueueConfig.DELAY_QUEUE)
public class DelayQueueConsumer 

    @RabbitHandler
    public void receiveDelayMessage(Integer milliseconds)
        log.warn("DelayQueueConsumer Time : , and the millis : ", ZonedDateTime.now(), milliseconds);

    

 

测试

先启动项目;

然后在测试类中发送消息;

@RunWith(SpringRunner.class)
@SpringBootTest
public class RabbitmqApplicationTests 

    @Autowired
    private DelayQueueSender delayQueueSender;


    @Test
    public void testDelayQueueSender()
        delayQueueSender.sendDelayQueue(5000);
    

 

发送消息窗口:

技术图片

 

消费者受到消息:

技术图片

 

时间间隔证明延迟队列发送完成!

 

参考:

https://blog.csdn.net/linsongbin1/article/details/80178122

https://blog.csdn.net/youjin/article/details/82586888

https://docs.spring.io/spring-amqp/docs/2.0.0.M2/reference/htmlsingle/#delayed-message-exchange

https://www.rabbitmq.com/blog/2015/04/16/scheduling-messages-with-rabbitmq/

 

以上是关于SpringBoot RabbitMQ 延迟队列代码实现的主要内容,如果未能解决你的问题,请参考以下文章

SpringBoot RabbitMQ 延迟队列代码实现

[SpringBoot] Spring Boot(14)RabbitMQ延迟队列

SpringBoot集成RabbitMQ之死信队列限流队列延迟队列(第四节)

Rabbitmq之整合Springboot,普通队列以及死信队列demo实例,队列优化以及插件实现延迟队列

RabbitMQ:延迟队列

RabbitMQ延迟队列