是否有可能以某种方式同时在 ZMQ 中使用 Send/Recv(通过多线程)?

Posted

技术标签:

【中文标题】是否有可能以某种方式同时在 ZMQ 中使用 Send/Recv(通过多线程)?【英文标题】:Is is possible to somehow use Send/Recv in ZMQ simultaniousely(by multithreading)? 【发布时间】:2014-10-16 22:43:07 【问题描述】:

我正在尝试提出如何有效地使用 ZMQ 进行多线程(因此发送不会阻塞接收并且接收不会阻塞发送)。

我想使用ZMQ_DONTWAIT标志但是在发送数据时,它有时不会发送(EAGAIN错误,所以我必须重新排队处理消息,这在处理兆字节时浪费资源数据)。

我确实想出了以下代码:

Concurrency::concurrent_queue<zmq::message_t> QUEUE_IN;
Concurrency::concurrent_queue<zmq::message_t> QUEUE_OUT;

void SendThread(zmq::context_t &context) 
    zmq::socket_t zmq_socket(context, ZMQ_DEALER);
    zmq_socket.connect(string_format("tcp://%s:%s", address, port).c_str());
    zmq::message_t reply;
    while (true) 
        while (QUEUE_OUT.try_pop(reply))
            zmq_socket.send(reply);
        Sleep(1);
    


void RecvThread(zmq::context_t &context) 
    zmq::socket_t zmq_socket(context, ZMQ_DEALER);
    zmq_socket.connect(string_format("tcp://%s:%s", address, port).c_str());
    zmq::message_t reply;
    while (true) 
        while (zmq_socket.recv(&reply))
            QUEUE_IN.push(reply);
    


void ConnectionThread()

    zmq::context_t context(1);
    std::thread* threads[2] =  
        new std::thread(SendThread, context), 
        new std::thread(RecvThread, context)
    ;
    threads[0]->join();

但是,这将需要服务器端的两个套接字,并且我需要确定我需要向哪个发送数据以及我需要在服务器端监听哪个,对吗? 有没有办法在多线程环境中使用一个套接字而使用发送和接收?

我可能想在一个套接字上异步执行此操作,但在研究了异步示例之后,我仍然没有理解这个想法,因为它周围没有太多的 cmets。

【问题讨论】:

【参考方案1】:

避免睡眠

为了避免休眠,您可以使用 zmq_poll() 和 ZMQ_POLLOUT 事件来保护 send()。您不需要使用 ZMQ_DONTWAIT。 [我在那里使用了 C 函数,您的绑定将具有等价的。]

路由到 RecvThread

线程之间不能共享套接字,因此需要 2 个套接字才能工作。服务器只需要一个绑定到 2 个端口的套接字(可能是 ROUTER)。当它收到一条消息时,它需要知道将回复发送到哪里......

当 ROUTER 套接字接收到消息时,zmq 内部会在消息中添加一个带有发送者身份的帧。服务器代码将看到此帧,在构造消息以回复发送者时,它通常会使用相同的身份帧。在您的情况下,这是客户端的 SendThread。 OTOH,您要回复客户端的接收套接字,因此身份框架必须用于此。

剩下的就是服务端如何获取客户端接收套接字的标识帧了。为此,您需要发明一个小协议。安排客户端的 RecvThread 向服务器发送一条消息几乎就足够了。服务器应该理解该消息并简单地保留客户端接收套接字的标识帧,并在构造回复消息时使用它的副本。

所有这些都在“探索路由器套接字”下的指南中进行了解释。

【讨论】:

【参考方案2】:
    当发送大数据时(你说你在一条消息中发送 MB 的数据),这需要一些时间,ZMQ 不会“双工”发送和接收,因此它们都可以实际发生。 DONTWAIT 标志在那里不会对您有太大帮助,其目的是确保您在执行非 ZMQ 操作时不会等待 ZMQ。在任何情况下,所有消息仍应排队(除非受到高水位线的干扰) 唯一安全地使用多线程并行发送和接收的方法是使用多个套接字。

但是,也不全是坏事。如果您使用一个指定的发送套接字和一个指定的接收套接字,那么您可以使用 pub/sub,这会打开一些有趣的选项。

【讨论】:

@JohnJefferies 你说得对,我误读了警告in the guide here,单独的进程需要单独的上下文,单独的线程不需要。我已经相应地更新了我的答案。

以上是关于是否有可能以某种方式同时在 ZMQ 中使用 Send/Recv(通过多线程)?的主要内容,如果未能解决你的问题,请参考以下文章

是否有可能以某种方式使该程序崩溃?

是否有可能在反序列化时以某种方式捕获与任何 POCO 属性不匹配的 JSON 数据的其余部分?

是否可以以某种方式在表格行周围设置边框?

是否可以以某种方式将缩略图icn 添加到操作表?

ZMQ之脱机可靠性--巨人模式

FastAPI如何将ZMQ添加到事件循环