使用DelayExchange插件实现RabbitMQ延迟队列

Posted shuhaooo

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用DelayExchange插件实现RabbitMQ延迟队列相关的知识,希望对你有一定的参考价值。

1、安装DelayExchange插件

1.1、下载插件

RabbitMQ有一个官方的插件社区,地址为:https://www.rabbitmq.com/community-plugins.html

下载与自己安装的RabbitMQ版本对应的DelayExchange插件,如3.8.9版本的插件对应RabbitMQ的3.8.5以上版本。

1.2、上传插件
1.2.1 进入RabbitMQ插件目录
cd /usr/lib/rabbitmq/lib/rabbitmq_server-3.10.10/plugins

如果是docker安装的RabbitMQ,使用下面命令查看数据卷,然后进入插件目录

docker volume inspect mq-plugins

1.2.2 将下载的插件上传到该目录
1.2.3 然后启动插件

docker需要进入RabbitMQ容器内部

docker exec -it 容器名 bash

启动插件命令

rabbitmq-plugins enable rabbitmq_delayed_message_exchange


如果上传的版本不对这里会报错,重新下载上传合适的插件版本即可
最后重启RabbitMQ或容器
此时进入RabbitMQ管理页面可以看到能选择该类型,代表安装成功

2、声明DelayExchange交换机

	//声明DelayExchange交换机,处理延迟消息
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "my.delay.queue", durable = "true"),
            exchange = @Exchange(name = "my.delay.direct",delayed = "true"),
            key = "delay"
    ))
    public void listenDelayQueue(String msg, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws Exception
        System.err.println("接收到的延迟消息:" + msg);
        channel.basicAck(deliveryTag, false);//如果开启了手动确认机制,则需要手动ack

3、发送消息

发送消息时,一定要携带x-delay属性,指定延迟的时间

    /**
     * 发送延迟消息
     * @param msg 待处理消息
     * @param time 延迟时间ms
     */
    public void sendDelayQueue(String msg,Long time)
        // 创建消息
        Message message = MessageBuilder
                .withBody(msg.getBytes(StandardCharsets.UTF_8))
                .setHeader("x-delay",time)
                .build();
        // 消息ID,需要封装到CorrelationData中
        CorrelationData correlationData = new CorrelationData(IdUtils.simpleUUID());
        // 发送消息
        rabbitTemplate.convertAndSend("my.delay.direct", "delay", message, correlationData);
        System.out.println("发送消息成功");
    

以上是关于使用DelayExchange插件实现RabbitMQ延迟队列的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ 实现消息队列延迟

介绍一款自己实现的rabbit轻量级组件和使用方法

Java中动态声明与绑定Rabbit MQ队列以及延迟队列的实现与使用

Java中动态声明与绑定Rabbit MQ队列以及延迟队列的实现与使用

rabbit_mq实现分布式事务

Rabbit MQ