rabbitmq安装延时队列插件实现延时队列

Posted 好大的月亮

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了rabbitmq安装延时队列插件实现延时队列相关的知识,希望对你有一定的参考价值。

下载插件地址

要注意和自己的rabbitmq的版本对应起来
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

我的mq是docker安装的3.9.7的

下载完之后把插件copymqplugin目录下,然后启用插件。之后重启容器,我这里是docker-compose安装的

rabbitmq-plugins enable rabbitmq_delayed_message_exchange
docker-compose restart


进入rabbitmq管理页面查看插件是否安装成功

Type里面查看是否有x-delayed-message选项,如果存在就代表插件安装成功。

使用mq延时队列插件下springboot实现延时队列

yaml配置mq,然后在mq管理页面创建虚拟host:fchan

spring:
  rabbitmq:
    host: 110.40.181.73
    port: 35672
    username: root
    password: 10086
    virtual-host: /fchan

配置延时队列和延时交换机的绑定

package com.fchan.mq.mqDelay;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.util.HashMap;
import java.util.Map;

@Configuration
public class MqDelayConfig {

    //最后经过死信队列转发后实际消费的交换机
    private static final String EXCHANGE_NAME = "delayed_exchange";
    //最后经过死信队列转发后实际消费的队列
    private static final String QUEUE_NAME = "delayed_queue";
    //最后经过死信队列转发后实际消费的路由key
    private static final String ROUTE_KEY = "delayed_key";

    /**
     * 交换机
     */
    @Bean
    CustomExchange exchange() {
        //通过x-delayed-type参数设置fanout /direct / topic / header 类型
        Map<String, Object> args = new HashMap<>();
        args.put("x-delayed-type", "direct");
        return new CustomExchange(EXCHANGE_NAME, "x-delayed-message",true, false,args);
    }

    /**
     * 队列
     */
    @Bean
    public Queue queue() {
        return new Queue(QUEUE_NAME,true,false,false);
    }

    /**
     * 将队列绑定到交换机
     */
    @Bean
    public Binding binding(CustomExchange exchange, Queue queue) {
        return BindingBuilder
                .bind(queue)
                .to(exchange)
                .with(ROUTE_KEY)
                .noargs();
    }
}

消息生产者

package com.fchan.mq.mqDelay;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;


@Component
public class MyRabbitSender {

    Logger log = LoggerFactory.getLogger(MyRabbitSender.class);

    private static final String ROUTE_KEY = "delayed_key";
    private static final String EXCHANGE_NAME = "delayed_exchange";

    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * @param msg   消息
     * @param delay 延时时间,秒
     */
    public void send2(String msg, int delay) {
        log.info("RabbitSender.send() msg = {}", msg);
        rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTE_KEY, msg, message -> {
            message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);  //消息持久化
            message.getMessageProperties().setDelay(delay * 1000);   // 单位为毫秒
            return message;
        });
    }
}

消息消费者

package com.fchan.mq.mqDelay;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class MyRabbitConsume {
    Logger log = LoggerFactory.getLogger(MyRabbitConsume.class);

    @RabbitListener(queues = "delayed_queue")
    public void infoConsumption(String data) throws Exception {
        log.info("收到信息:{}",data);
        log.info("然后进行一系列逻辑处理 Thanks♪(・ω・)ノ");
    }
}

参考了大佬的博文
https://juejin.cn/post/6977516798828609567#heading-13

以上是关于rabbitmq安装延时队列插件实现延时队列的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ---延迟队列,整合springboot

RabbitMQ 安装部署(New)& 延时队列使用

RabbitMQ 安装部署(New)& 延时队列使用

RabbitMQ:伪延时队列

RabbitMQ实现延时队列

RabbitMq 实现延时队列-Springboot版本