使用RabbitMQ插件实现延迟队列
Posted 刘元涛
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用RabbitMQ插件实现延迟队列相关的知识,希望对你有一定的参考价值。
首先我们需要下载
rabbitmq_delayed_message_exchange 插件,这是一个 GitHub 上的开源项目,我们直接下载即可:
https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
选择适合自己的版本,放到插件目录,执行如下命令
rabbitmq-plugins enable rabbitmq_delayed_message_exchange
在Springboot项目里面配置自定义交换机
@Bean
CustomExchange customExchange()
Map<String, Object> args = new HashMap<>();
args.put("x-delayed-type", "direct");
return new CustomExchange(EXCHANGE_NAME, EXCHANGE_TYPE, true, false,args);
这里主要是交换机的定义有所不同,小伙伴们需要注意。
这里我们使用的交换机是 CustomExchange,这是一个 Spring 中提供的交换机,创建 CustomExchange 时有五个参数,含义分别如下:
- 交换机名称。
- 交换机类型,这个地方是固定的。
- 交换机是否持久化。
- 如果没有队列绑定到交换机,交换机是否删除。
- 其他参数。
最后一个 args 参数中,指定了交换机消息分发的类型,这个类型就是大家熟知的 direct、fanout、topic 以及 header 几种,用了哪种类型,将来交换机分发消息就按哪种方式来。
创建一个消费者:
@Component
public class MsgReceiver
private static final Logger logger = LoggerFactory.getLogger(MsgReceiver.class);
@RabbitListener(queues = RabbitConfig.QUEUE_NAME)
public void handleMsg(String msg)
logger.info("handleMsg,",msg);
使用单元测试创建一个生产者:
@SpringBootTest
class MqDelayedMsgDemoApplicationTests
@Autowired
RabbitTemplate rabbitTemplate;
@Test
void contextLoads() throws UnsupportedEncodingException
Message msg = MessageBuilder.withBody(("hello 江南一点雨"+new Date()).getBytes("UTF-8")).setHeader("x-delay", 3000).build();
rabbitTemplate.convertAndSend(RabbitConfig.EXCHANGE_NAME, RabbitConfig.QUEUE_NAME, msg);
在消息头中设置消息的延迟时间。
以上是关于使用RabbitMQ插件实现延迟队列的主要内容,如果未能解决你的问题,请参考以下文章