使用RabbitMQ插件实现延迟队列

Posted 刘元涛

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用RabbitMQ插件实现延迟队列相关的知识,希望对你有一定的参考价值。

首先我们需要下载
rabbitmq_delayed_message_exchange 插件,这是一个 GitHub 上的开源项目,我们直接下载即可:

https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

选择适合自己的版本,放到插件目录,执行如下命令

rabbitmq-plugins enable rabbitmq_delayed_message_exchange

在Springboot项目里面配置自定义交换机

@Bean
CustomExchange customExchange() 
    Map<String, Object> args = new HashMap<>();
    args.put("x-delayed-type", "direct");
    return new CustomExchange(EXCHANGE_NAME, EXCHANGE_TYPE, true, false,args);

这里主要是交换机的定义有所不同,小伙伴们需要注意。

这里我们使用的交换机是 CustomExchange,这是一个 Spring 中提供的交换机,创建 CustomExchange 时有五个参数,含义分别如下:

  • 交换机名称。
  • 交换机类型,这个地方是固定的。
  • 交换机是否持久化。
  • 如果没有队列绑定到交换机,交换机是否删除。
  • 其他参数。

最后一个 args 参数中,指定了交换机消息分发的类型,这个类型就是大家熟知的 direct、fanout、topic 以及 header 几种,用了哪种类型,将来交换机分发消息就按哪种方式来。

创建一个消费者:

@Component
public class MsgReceiver 
    private static final Logger logger = LoggerFactory.getLogger(MsgReceiver.class);
    @RabbitListener(queues = RabbitConfig.QUEUE_NAME)
    public void handleMsg(String msg) 
        logger.info("handleMsg,",msg);
    

使用单元测试创建一个生产者:

@SpringBootTest
class MqDelayedMsgDemoApplicationTests 

    @Autowired
    RabbitTemplate rabbitTemplate;
    @Test
    void contextLoads() throws UnsupportedEncodingException 
        Message msg = MessageBuilder.withBody(("hello 江南一点雨"+new Date()).getBytes("UTF-8")).setHeader("x-delay", 3000).build();
        rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, RabbitConfig.QUEUE_NAME, msg);
    

在消息头中设置消息的延迟时间。

以上是关于使用RabbitMQ插件实现延迟队列的主要内容,如果未能解决你的问题,请参考以下文章

使用RabbitMQ插件实现延迟队列

使用RabbitMQ插件实现延迟队列

使用RabbitMQ插件实现延迟队列

使用RabbitMQ插件实现延迟队列

Docker版RabbitMQ安装延迟队列插件及延迟队列项目应用实战

RabbitMQ---延迟队列,整合springboot