在 RabbitMQ 中手动确认消息
Posted
技术标签:
【中文标题】在 RabbitMQ 中手动确认消息【英文标题】:Manually ack messages in RabbitMQ 【发布时间】:2015-06-21 08:58:07 【问题描述】:以前我正在阅读队列中存在的所有消息,但现在我必须根据用户的选择(计数)返回特定数量的消息。
我尝试相应地更改 for 循环,但由于自动确认,它会读取所有消息。所以我尝试在配置文件中将其更改为手动。
在我的程序中,如何在阅读 msg 后手动确认消息(目前我正在使用 AmqpTemplate 接收并且我没有频道参考)?
Properties properties = admin.getQueueProperties("queue_name");
if(null != properties)
Integer messageCount = Integer.parseInt(properties.get("QUEUE_MESSAGE_COUNT").toString());
while(messageCount > 0)
Message msg = amqpTemplate.receive(queue_name);
String value = new String(msg.getBody());
valueList.add(value);
messageCount--;
任何帮助都非常感谢,在此先感谢。
【问题讨论】:
AmqpTemplate#receive
自动确认消息,除非通道被交易。要控制确认,您可以使用AmqpTemplate#execute
并手动接收,或者最好的方法是使用SimpleMessageListenerContainer
甚至BlockingQueueConsumer
@NicolasLabrot 我没有在 AmqpTemplate 中找到执行方法,你指的是别的东西吗?是的,我确实在 SimpleMessageListenerContainer 中将 setAcknowledgeMode 设置为 MANUAL。
对不起,我指的是RabbitTemplate#execute
,它是AmqpTemplate
的实现
@NicolasLabrot 您能否对此有所了解。什么是 ChannelCallback,看起来我需要一个我没有的频道参考。
看看RabbitTemplate#receive
code 但我不认为这是正确的方法。
【参考方案1】:
您不能使用 receive()
方法手动确认 - 使用 SimpleMessageListenerContainer
用于带有手动确认和 ChannelAwareMessageListener
的事件驱动消费者。或者,使用模板的 execute()
方法,它可以让您访问 Channel
- 但是您将使用较低级别的 RabbitMQ API,而不是 Message
抽象。
编辑:
您需要学习底层的 RabbitMQ Java API 才能使用执行,但是这样的东西会起作用...
final int messageCount = 3;
boolean result = template.execute(new ChannelCallback<Boolean>()
@Override
public Boolean doInRabbit(final Channel channel) throws Exception
int n = messageCount;
channel.basicQos(messageCount); // prefetch
long deliveryTag = 0;
while (n > 0)
GetResponse result = channel.basicGet("si.test.queue", false);
if (result != null)
System.out.println(new String(result.getBody()));
deliveryTag = result.getEnvelope().getDeliveryTag();
n--;
else
Thread.sleep(1000);
if (deliveryTag > 0)
channel.basicAck(deliveryTag, true);
return true;
);
【讨论】:
GaryRussell 能否请您指出任何有关如何使用 execute() 的示例代码,我对此很陌生,我没有太多关于此的论坛。提前致谢 如果回答您的问题,习惯上将其标记为已接受(单击复选标记)。这将有助于其他用户搜索相同的答案。 @GaryRussell 如果我们使用 receive* 方法,Spring 是使用 NONE 模式还是 AUTO 模式?意思是,是让 RabbitMQ 认为消息是自动传递的,还是 Spring 在收到消息后做 ACK? 不要在 cmets 中提出新问题,尤其是在六年前的答案中。 Ack 模式仅适用于侦听器容器。模板在返回之前调用basicAck
。如果要拒绝并重新排队消息,则必须在事务中运行 receive()
方法并抛出异常,以便回滚 basicAck
并将消息重新排队。以上是关于在 RabbitMQ 中手动确认消息的主要内容,如果未能解决你的问题,请参考以下文章