RabbitMQ 挂起队列消费
Posted
技术标签:
【中文标题】RabbitMQ 挂起队列消费【英文标题】:RabbitMQ suspend queue consumption 【发布时间】:2013-02-24 08:28:15 【问题描述】:保持持久队列及其绑定但暂停其消费者的最佳方法是什么?
用例是:如果我们不断收到一堆我们无法处理的消息(例如数据库已关闭或架构问题)但想保留,我想“让它崩溃”并停止处理消息聚合到队列中。即允许发布但暂停消费。
我能想到三个解决方案:
-
我可以让绑定到队列的所有消费者不断拒绝消息并重新排队,但这有点浪费资源,更不用说我已经以编程方式执行上述逻辑了。
我可以在所有消费者上调用
basic.cancelConsumer
(见下文)
或者就spring-amqp而言,我想我可以在绑定到队列的所有SimpleMessageListenerContainers上调用shutdown
。
#1
我们已经在做,因为邮件被拒绝了。问题是这最终就像一个无限循环的失败,如果你记录这个失败,就会浪费更多的资源。
#3
似乎很理想,但我必须了解所有消息侦听器,然后通知它们关闭。我想我可以使用扇出交换来通知队列需要暂停。我觉得 RabbitMQ 必须为这个逻辑内置一些东西。另一个问题是您可以将多个队列绑定到一个消息容器(并非所有队列都可能需要暂停)。
对于#2
,我知道我可以使用cancel the consumer 和consumerTag
,但问题(假设这是执行上述操作的正确方法)是我从哪里获得consumerTag
s 的列表到队列中?
【问题讨论】:
【参考方案1】:如果您需要停止消费者,那么只需调用基本取消即可。
拥有持久队列是您在声明队列时解决的问题:durable=true
auto_delete=false
。
在发布消息时确定是否具有持久性消息:delivery_mode=2.
每个消费者都有自己的消费者标签。每当您从队列中获得消息时,信封应该有消费者标签,您应该使用该标签调用基本取消。
AFAIK,您不能在 conn 1 中有消费者 A,并在不同的连接上为该消费者调用基本取消。这个我可能错了。
【讨论】:
谢谢。我知道如何像上面一样在兔子中声明队列。看起来我必须自己管理我的消费者,这是我已经走的路。如果 rabbit 可以选择管理员来暂停队列的消费,那就太好了。 你唯一能做的就是从管理员那里强制关闭它,这并不是你想要的【参考方案2】:此解决方案是 spring-amqp 特定的。我的回答基本上是#3
。
我维护一个具有队列名称Map<String,CustomSimpleMessageListenerContainer>
到自定义扩展SimpleMessageListenerContainer
的服务。
在出现一定数量的异常后,我会发送一条“恐慌”消息,该消息会进入服务接收到的特殊队列以关闭消费者。
【讨论】:
使用辅助队列传递的 panic 是个好主意!以上是关于RabbitMQ 挂起队列消费的主要内容,如果未能解决你的问题,请参考以下文章
使用Java模拟消费者是如何消费rabbitMQ消息队列中的消息的