在 RabbitMQ 中手动确认消息

Posted

技术标签:

【中文标题】在 RabbitMQ 中手动确认消息【英文标题】:Manually ack messages in RabbitMQ 【发布时间】:2015-06-21 08:58:07 【问题描述】:

以前我正在阅读队列中存在的所有消息,但现在我必须根据用户的选择(计数)返回特定数量的消息。

我尝试相应地更改 for 循环,但由于自动确认,它会读取所有消息。所以我尝试在配置文件中将其更改为手动。

在我的程序中,如何在阅读 msg 后手动确认消息(目前我正在使用 AmqpTemplate 接收并且我没有频道参考)?

    Properties properties = admin.getQueueProperties("queue_name");
    if(null != properties)
    
        Integer messageCount = Integer.parseInt(properties.get("QUEUE_MESSAGE_COUNT").toString());          
        while(messageCount > 0)
        
            Message msg = amqpTemplate.receive(queue_name);
            String value = new String(msg.getBody());
            
            valueList.add(value);
            messageCount--;
        

任何帮助都非常感谢,在此先感谢。

【问题讨论】:

AmqpTemplate#receive 自动确认消息,除非通道被交易。要控制确认,您可以使用AmqpTemplate#execute 并手动接收,或者最好的方法是使用SimpleMessageListenerContainer 甚至BlockingQueueConsumer @NicolasLabrot 我没有在 AmqpTemplate 中找到执行方法,你指的是别的东西吗?是的,我确实在 SimpleMessageListenerContainer 中将 setAcknowledgeMode 设置为 MANUAL。 对不起,我指的是RabbitTemplate#execute,它是AmqpTemplate的实现 @NicolasLabrot 您能否对此有所了解。什么是 ChannelCallback,看起来我需要一个我没有的频道参考。 看看RabbitTemplate#receive code 但我不认为这是正确的方法。 【参考方案1】:

您不能使用 receive() 方法手动确认 - 使用 SimpleMessageListenerContainer 用于带有手动确认和 ChannelAwareMessageListener 的事件驱动消费者。或者,使用模板的 execute() 方法,它可以让您访问 Channel - 但是您将使用较低级别的 RabbitMQ API,而不是 Message 抽象。

编辑:

您需要学习底层的 RabbitMQ Java API 才能使用执行,但是这样的东西会起作用...

    final int messageCount = 3;
    boolean result = template.execute(new ChannelCallback<Boolean>() 

        @Override
        public Boolean doInRabbit(final Channel channel) throws Exception 
            int n = messageCount;
            channel.basicQos(messageCount); // prefetch
            long deliveryTag = 0;
            while (n > 0) 
                GetResponse result = channel.basicGet("si.test.queue", false);
                if (result != null) 
                    System.out.println(new String(result.getBody()));
                    deliveryTag = result.getEnvelope().getDeliveryTag();
                    n--;
                
                else 
                    Thread.sleep(1000);
                
            
            if (deliveryTag > 0) 
                channel.basicAck(deliveryTag, true);
            
            return true;
        
    );

【讨论】:

GaryRussell 能否请您指出任何有关如何使用 execute() 的示例代码,我对此很陌生,我没有太多关于此的论坛。提前致谢 如果回答您的问题,习惯上将其标记为已接受(单击复选标记)。这将有助于其他用户搜索相同的答案。 @GaryRussell 如果我们使用 receive* 方法,Spring 是使用 NONE 模式还是 AUTO 模式?意思是,是让 RabbitMQ 认为消息是自动传递的,还是 Spring 在收到消息后做 ACK? 不要在 cmets 中提出新问题,尤其是在六年前的答案中。 Ack 模式仅适用于侦听器容器。模板在返回之前调用basicAck。如果要拒绝并重新排队消息,则必须在事务中运行 receive() 方法并抛出异常,以便回滚 basicAck 并将消息重新排队。

以上是关于在 RabbitMQ 中手动确认消息的主要内容,如果未能解决你的问题,请参考以下文章

在 RabbitMQ 中手动确认消息

springboot项目整合rabbitMq涉及消息的发送确认,消息的消费确认机制

手动的进行消息应答

rabbitmq 注解配置使用(四)消息确认(消费)

RabbitMQ确认机制问题处理

使用rabbitmq手动确认消息的,定时获取队列消息实现