使用DelayExchange插件实现RabbitMQ延迟队列
Posted shuhaooo
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用DelayExchange插件实现RabbitMQ延迟队列相关的知识,希望对你有一定的参考价值。
1、安装DelayExchange插件
1.1、下载插件
RabbitMQ有一个官方的插件社区,地址为:https://www.rabbitmq.com/community-plugins.html
下载与自己安装的RabbitMQ版本对应的DelayExchange插件,如3.8.9版本的插件对应RabbitMQ的3.8.5以上版本。
1.2、上传插件
1.2.1 进入RabbitMQ插件目录
cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.10.10/plugins
如果是docker安装的RabbitMQ,使用下面命令查看数据卷,然后进入插件目录
docker volume inspect mq-plugins
1.2.2 将下载的插件上传到该目录
1.2.3 然后启动插件
docker需要进入RabbitMQ容器内部
docker exec -it 容器名 bash
启动插件命令
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
如果上传的版本不对这里会报错,重新下载上传合适的插件版本即可
最后重启RabbitMQ或容器
此时进入RabbitMQ管理页面可以看到能选择该类型,代表安装成功
2、声明DelayExchange交换机
//声明DelayExchange交换机,处理延迟消息
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "my.delay.queue", durable = "true"),
exchange = @Exchange(name = "my.delay.direct",delayed = "true"),
key = "delay"
))
public void listenDelayQueue(String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws Exception
System.err.println("接收到的延迟消息:" + msg);
channel.basicAck(deliveryTag, false);//如果开启了手动确认机制,则需要手动ack
3、发送消息
发送消息时,一定要携带x-delay属性,指定延迟的时间
/**
* 发送延迟消息
* @param msg 待处理消息
* @param time 延迟时间ms
*/
public void sendDelayQueue(String msg,Long time)
// 创建消息
Message message = MessageBuilder
.withBody(msg.getBytes(StandardCharsets.UTF_8))
.setHeader("x-delay",time)
.build();
// 消息ID,需要封装到CorrelationData中
CorrelationData correlationData = new CorrelationData(IdUtils.simpleUUID());
// 发送消息
rabbitTemplate.convertAndSend("my.delay.direct", "delay", message, correlationData);
System.out.println("发送消息成功");
以上是关于使用DelayExchange插件实现RabbitMQ延迟队列的主要内容,如果未能解决你的问题,请参考以下文章
Java中动态声明与绑定Rabbit MQ队列以及延迟队列的实现与使用