SimpleAmqpClient Channel::basicConsume() ,如何从 RabbitMQ 获取所有消息
Posted
技术标签:
【中文标题】SimpleAmqpClient Channel::basicConsume() ,如何从 RabbitMQ 获取所有消息【英文标题】:SimpleAmqpClient Channel::basicConsume() ,how to fetch all the messages from the RabbitMQ 【发布时间】:2018-02-21 12:22:22 【问题描述】:这是一个非常基本的问题,我将把它分成两部分
用例: 我正在编写一个需要与服务器上运行的 RabbitMQ 通信的 c++ 应用程序。目前,我能够在队列上发布消息并使用队列中的消息(如果队列中有 n 条消息,则只消耗一条消息)。
我可以从 basicConsume() 方法的描述中读到 "开始使用队列中的基本消息 作为消费者订阅队列,因此队列中的所有未来消息 将是 Basic.Delivered"
因此,每当我在队列上执行 basicConsume() 时,所有消息都会传递给我(在服务器上,我的队列为空),
我正在做类似的事情
#include <SimpleAmqpClient/SimpleAmqpClient.h>
#include<iostream>
#include<string>
using namespace std;
using namespace AmqpClient;
int main()
Channel::ptr_t connection = Channel::Create("ip",5672,usename,password);
const std::string body("Test Message3");
BasicMessage::ptr_t out_message = BasicMessage::Create(body);
//code to publish , works fine
//connection->BasicPublish("exchange", "key",BasicMessage::Create("Message1"));
//BasicConsume say we have 5 messages in my queue
string consumer_tag=connection->BasicConsume("queue","");
Envelope::ptr_t envelope = connection->BasicConsumeMessage(consumer_tag);
connection->BasicAck(envelope->GetDeliveryInfo());
BasicMessage::ptr_t bodyBasicMessage=envelope->Message();
string messageBody=bodyBasicMessage->Body();
cout<<"message body : "<<messageBody<<endl;
我只能读取队列中的第一条消息。
问题:
-
如何让 C++ 消费者持续监听我的队列,并且只要有消息应用程序就会使用它(托管 c++ 应用程序/使用线程/使用无限循环)我没有任何线程经验或托管c++ 应用程序(目前我只限于可执行文件),所以任何建议的阅读都是最受欢迎的。
如何阅读被丢弃的其余 4 条消息。
非常感谢。
【问题讨论】:
【参考方案1】:RabbitMQ 团队会监控 this mailing list,但有时只会在 *** 上回答问题。
根据以下问题,您必须通过调用BasicConsumeMessage
来消费每条消息 -
https://github.com/alanxz/SimpleAmqpClient/issues/162
这个库不支持异步回调。
【讨论】:
【参考方案2】:感谢@Luke 的回复。 我通过执行以下操作解决了丢弃消息的问题 将 noAck 设置为 false 的 BasicConsume -> basicConsumeMessage 在无限循环内,后跟 basicAck() 通过这样做,我的应用程序等待从队列中消费(当队列中没有消息时不会中断)并且队列不会向我发送下一条消息,直到它收到第一个消息的 ACK。
现在我打算把它做成一个守护进程,所以关闭终端不会影响执行。
欢迎任何其他建议。
【讨论】:
以上是关于SimpleAmqpClient Channel::basicConsume() ,如何从 RabbitMQ 获取所有消息的主要内容,如果未能解决你的问题,请参考以下文章
Kotlin 协程Channel 通道 ① ( Channel#send 发送数据 | Channel#receive 接收数据 )
Kotlin 协程Channel 通道 ① ( Channel#send 发送数据 | Channel#receive 接收数据 )
Kotlin 协程Channel 通道 ④ ( Channel 通道的热数据流属性 | Channel 通道关闭过程 | Channel 通道关闭代码示例 )