rabbitmq安装延时队列插件实现延时队列
Posted 好大的月亮
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了rabbitmq安装延时队列插件实现延时队列相关的知识,希望对你有一定的参考价值。
下载插件地址
要注意和自己的rabbitmq的版本对应起来
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
我的mq是docker安装的3.9.7的
下载完之后把插件copy
到mq
的plugin
目录下,然后启用插件。之后重启容器,我这里是docker-compose
安装的
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
docker-compose restart
进入rabbitmq管理页面查看插件是否安装成功
在Type
里面查看是否有x-delayed-message
选项,如果存在就代表插件安装成功。
使用mq延时队列插件下springboot实现延时队列
yaml配置mq,然后在mq管理页面创建虚拟host:fchan
spring:
rabbitmq:
host: 110.40.181.73
port: 35672
username: root
password: 10086
virtual-host: /fchan
配置延时队列和延时交换机的绑定
package com.fchan.mq.mqDelay;
import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.CustomExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class MqDelayConfig {
//最后经过死信队列转发后实际消费的交换机
private static final String EXCHANGE_NAME = "delayed_exchange";
//最后经过死信队列转发后实际消费的队列
private static final String QUEUE_NAME = "delayed_queue";
//最后经过死信队列转发后实际消费的路由key
private static final String ROUTE_KEY = "delayed_key";
/**
* 交换机
*/
@Bean
CustomExchange exchange() {
//通过x-delayed-type参数设置fanout /direct / topic / header 类型
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange(EXCHANGE_NAME, "x-delayed-message",true, false,args);
}
/**
* 队列
*/
@Bean
public Queue queue() {
return new Queue(QUEUE_NAME,true,false,false);
}
/**
* 将队列绑定到交换机
*/
@Bean
public Binding binding(CustomExchange exchange, Queue queue) {
return BindingBuilder
.bind(queue)
.to(exchange)
.with(ROUTE_KEY)
.noargs();
}
}
消息生产者
package com.fchan.mq.mqDelay;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.MessageDeliveryMode;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
@Component
public class MyRabbitSender {
Logger log = LoggerFactory.getLogger(MyRabbitSender.class);
private static final String ROUTE_KEY = "delayed_key";
private static final String EXCHANGE_NAME = "delayed_exchange";
@Autowired
private RabbitTemplate rabbitTemplate;
/**
* @param msg 消息
* @param delay 延时时间,秒
*/
public void send2(String msg, int delay) {
log.info("RabbitSender.send() msg = {}", msg);
rabbitTemplate.convertAndSend(EXCHANGE_NAME, ROUTE_KEY, msg, message -> {
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT); //消息持久化
message.getMessageProperties().setDelay(delay * 1000); // 单位为毫秒
return message;
});
}
}
消息消费者
package com.fchan.mq.mqDelay;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;
@Component
public class MyRabbitConsume {
Logger log = LoggerFactory.getLogger(MyRabbitConsume.class);
@RabbitListener(queues = "delayed_queue")
public void infoConsumption(String data) throws Exception {
log.info("收到信息:{}",data);
log.info("然后进行一系列逻辑处理 Thanks♪(・ω・)ノ");
}
}
参考了大佬的博文
https://juejin.cn/post/6977516798828609567#heading-13
以上是关于rabbitmq安装延时队列插件实现延时队列的主要内容,如果未能解决你的问题,请参考以下文章