如何在 AMQP 中停止消费消息一段时间(时间不固定)?
Posted
技术标签:
【中文标题】如何在 AMQP 中停止消费消息一段时间(时间不固定)?【英文标题】:How to halt consuming messages in an AMQP for a certain period of time (where time is not fixed)? 【发布时间】:2022-01-17 08:14:42 【问题描述】:我正在尝试在我的应用程序中实现以下场景:
-
当我的应用程序启动时,来自传入交换的消息应该被传入队列使用。
如果发生任何异常/错误,消息将被定向到死信队列。
当我的应用程序正在停机时(我不想在那段时间使用消息),我会将消息重定向到 ParkingLot 队列。
当停机时间结束后,我想先消费 ParkingLot Queue 中的消息,然后使用 Incoming Queue 开始正常消费消息。
我的问题是:这些场景可以实现吗?在这里,我主要是在谈论第4步。如果是,有人可以指出我正确的方向吗?
我的第二个问题是:这是实现这种情况的正确方法吗?或者有没有更好的方法来实现它?
添加代码:
@RabbitListener(queues = "$com.rabbitmq.queueName", id="msgId")
@RabbitListener(queues = "$com.rabbitmq.parkingQueueName", id="parkingId")
public void consumeMessage(Message message)
try
log.info("Received message: ",new String(message.getBody()));
//check if the application is down
if(val)
registry.getListenerContainer("msgId").stop();
rabbitTemplate.send(rabbitMQConfig.getExchange(), rabbitMQConfig.getParkingRoutingKey(), message);
catch(Exception e)
rabbitTemplate.send(rabbitMQConfig.getDeadLetterExchange(), rabbitMQConfig.getDeadLetterRoutingKey(), message);
【问题讨论】:
【参考方案1】:给每个@RabbitListener
一个id
属性。
然后使用RabbitListenerEndpointRegistry
bean 来控制容器的生命周期。
registry.getListenerContainer(id).stop();
和
registry.getListenerContainer(id).start();
您可以将两个 @RabbitListener
注释放在同一个方法上。
【讨论】:
感谢@Gary 的回复。我在编辑的消息中提到的代码中尝试了一些东西。但是所有的消息都会进入交易信队列。你能告诉我我做错了什么吗? 也许在 catch 块中添加一些日志以查看抛出了什么异常?此外,您不应该在侦听器线程上停止容器 - 它会导致延迟“等待侦听器完成”;在另一个线程上运行stop()
。以上是关于如何在 AMQP 中停止消费消息一段时间(时间不固定)?的主要内容,如果未能解决你的问题,请参考以下文章
Spring AMQP RPC使用者如何确定发布者通道是不是已关闭