如何在 AMQP 中停止消费消息一段时间(时间不固定)?

Posted

技术标签:

【中文标题】如何在 AMQP 中停止消费消息一段时间(时间不固定)?【英文标题】:How to halt consuming messages in an AMQP for a certain period of time (where time is not fixed)? 【发布时间】:2022-01-17 08:14:42 【问题描述】:

我正在尝试在我的应用程序中实现以下场景:

    当我的应用程序启动时,来自传入交换的消息应该被传入队列使用。 如果发生任何异常/错误,消息将被定向到死信队列。 当我的应用程序正在停机时(我不想在那段时间使用消息),我会将消息重定向到 ParkingLot 队列。 当停机时间结束后,我想先消费 ParkingLot Queue 中的消息,然后使用 Incoming Queue 开始正常消费消息。

我的问题是:这些场景可以实现吗?在这里,我主要是在谈论第4步。如果是,有人可以指出我正确的方向吗?

我的第二个问题是:这是实现这种情况的正确方法吗?或者有没有更好的方法来实现它?

添加代码:

@RabbitListener(queues = "$com.rabbitmq.queueName", id="msgId")
    @RabbitListener(queues = "$com.rabbitmq.parkingQueueName", id="parkingId")
    public void consumeMessage(Message message) 
        try 
            log.info("Received message: ",new String(message.getBody()));
            
            //check if the application is down
            if(val) 
                registry.getListenerContainer("msgId").stop();
                rabbitTemplate.send(rabbitMQConfig.getExchange(), rabbitMQConfig.getParkingRoutingKey(), message);
            
        catch(Exception e) 
            rabbitTemplate.send(rabbitMQConfig.getDeadLetterExchange(), rabbitMQConfig.getDeadLetterRoutingKey(), message);
        
    

【问题讨论】:

【参考方案1】:

给每个@RabbitListener 一个id 属性。

然后使用RabbitListenerEndpointRegistry bean 来控制容器的生命周期。

registry.getListenerContainer(id).stop();

registry.getListenerContainer(id).start();

您可以将两个 @RabbitListener 注释放在同一个方法上。

【讨论】:

感谢@Gary 的回复。我在编辑的消息中提到的代码中尝试了一些东西。但是所有的消息都会进入交易信队列。你能告诉我我做错了什么吗? 也许在 catch 块中添加一些日志以查看抛出了什么异常?此外,您不应该在侦听器线程上停止容器 - 它会导致延迟“等待侦听器完成”;在另一个线程上运行stop()

以上是关于如何在 AMQP 中停止消费消息一段时间(时间不固定)?的主要内容,如果未能解决你的问题,请参考以下文章

暂时停止使用RabbitMQ消息并稍后恢复

Spring AMQP RPC使用者如何确定发布者通道是不是已关闭

RabbitMQ / AMQP 中的消息组

MassTransit:在消费者消费完所有消息后如何停止公共汽车?

Spring消息之AMQP.

rabbitMQ系列1:深入理解AMQP协议