如何为RabbitMQ和Spring动态注册队列及其独占的使用者/监听器?

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何为RabbitMQ和Spring动态注册队列及其独占的使用者/监听器?相关的知识,希望对你有一定的参考价值。

我有以下问题需要解决:我正在尝试使用RabbitMQ消息传递实现一个简单的延迟重试机制。我有一个基础设施,让我延迟传递信息。我可以拥有任何想要在运行时利用这种延迟重试机制的感兴趣的参与者。

参与者只想向我提供2个详细信息和消息:1。队列名称,他们希望在延迟说T秒之后传递消息。 2.队列的消费者(比如消息的消费者。)

我试图做以下事情:

private void startSeparateListener(final Object messageConsumer, 
                                   final Queue queue) {
        SimpleMessageListenerContainer simpleMessageListenerContainer 
                    = myCustomeSimpleMessageListenerFactory.create();
        simpleMessageListenerContainer.setRabbitAdmin(rabbitAdmin);
        simpleMessageListenerContainer.setQueues(queue);
        simpleMessageListenerContainer.setMessageListener(new MessageListenerAdapter(messageConsumer));
        simpleMessageListenerContainer.start();

    }

请注意,已经使用rabbitadmin创建并注册了队列,并且对象使用者有一个名为handleMessage的方法来侦听队列。

这是在运行时与队列的消息使用者动态注册队列的正确方法吗?

注意:Spring已经提供了一个SimpleMessageListenerContainer类型的bean,但是使用bean来动态地添加队列和使用者会导致意外消费者的问题,即Q1作为另一个队列的接收者的一部分被调用,其中Q2的内容类型可能是相同的第一季度?

我试过很多关于它的搜索,但无法得到任何具体的解释。如果这是一个重复的问题和任何天真,请事先道歉。

答案

我无法以某种方式编译你的问题,但我可以说,在RabbitMQ中已经有Dealyed Exchange解决方案并且很好地使用了AMQP supports

我建议远离动态添加的SimpleMessageListenerContainer:它不是那么简单。有像addQueueNames()这样的选项:

/**
 * Add queue(s) to this container's list of queues. The existing consumers
 * will be cancelled after they have processed any pre-fetched messages and
 * new consumers will be created. The queue must exist to avoid problems when
 * restarting the consumers.
 * @param queueName The queue to add.
 */
@Override
public void addQueueNames(String... queueName) {

因此,您可以考虑不添加新容器,而是向现有容器添加新队列。 amqp_consumerQueue的下游路由可能有助于区分不同队列的消息。

以上是关于如何为RabbitMQ和Spring动态注册队列及其独占的使用者/监听器?的主要内容,如果未能解决你的问题,请参考以下文章

Spring Boot(十四)RabbitMQ延迟队列

Spring Boot(十四)RabbitMQ延迟队列

Spring Boot系列——RabbitMQ确认退回模式及死信队列

[SpringBoot] Spring Boot(14)RabbitMQ延迟队列

RabbitMQ及Spring集成

远程机器未运行时如何为远程 JMS 队列初始化 ConnectionFactory?