zmq 轮询器是如何工作的?

Posted

技术标签:

【中文标题】zmq 轮询器是如何工作的?【英文标题】:How does zmq poller work? 【发布时间】:2013-08-09 11:25:57 【问题描述】:

我对 poller 在 zmq 中的实际作用感到困惑。 zguide 对其进行了最低限度的介绍,并且仅将其描述为一种从多个套接字读取的方式。这对我来说不是一个令人满意的答案,因为它没有解释如何拥有超时套接字。我知道zeromq: how to prevent infinite wait? 解释了推/拉,但没有解释 req/rep 模式,这是我想知道如何使用的。

我想问的是:poller 是如何工作的,它的功能如何应用于跟踪套接字及其请求?

【问题讨论】:

您能否更清楚地说明您的具体问题是什么? 【参考方案1】:

当你需要在同一个线程中监听不同的套接字时,使用轮询器:

ZMQ.Socket subscriber = ctx.socket(ZMQ.SUB)
ZMQ.Socket puller = ctx.socket(ZMQ.PULL)

向轮询器注册套接字(POLLIN 监听传入消息)

ZMQ.Poller poller = ZMQ.Poller(2)
poller.register(subscriber, ZMQ.Poller.POLLIN)
poller.register(puller, ZMQ.Poller.POLLIN)

轮询时,使用循环:

while( notInterrupted())
  poller.poll()

  //subscriber registered at index '0'
  if( poller.pollin(0)) 
     subscriber.recv(ZMQ.DONTWAIT)

  //puller registered at index '1'
  if( poller.pollin(1))
     puller.recv( ZMQ.DONTWAIT)

选择您想要的投票方式...

poller.poll() 阻塞,直到任一套接字上有数据。poller.poll(1000) 阻塞 1 秒,然后超时。

当套接字上有可用的数据(消息)时,轮询器会通知;阅读它是你的工作。

阅读时,不要阻塞:socket.recv( ZMQ.DONTWAIT)。即使poller.pollin(0) 检查是否有要读取的数据,您也希望避免在轮询循环内出现任何阻塞调用,否则,您可能最终会由于“卡住”套接字而阻塞轮询器。

因此,如果将两条单独的消息发送到subscriber,您必须调用两次subscriber.recv() 才能清除轮询器,否则,如果您调用一次subscriber.recv(),轮询器将继续告诉您还有另一条消息被阅读。因此,本质上,轮询器跟踪消息的可用性和数量,而不是实际消息。

您应该浏览投票示例并使用代码,这是最好的学习方式。

这能回答你的问题吗?

【讨论】:

我们如何知道应该调用多少次subscriber.recv() 来清除轮询器? @Meysam,正如 raffian 所示,我们可以在循环中一个接一个地处理消息。不需要一次性付款,因为 0mq 提供了队列,并且在我们处理它的过程中消息可能会不断出现。此外,还有诸如 callback (on_recv) 之类的奇特方式来处理接收到的消息。 (Python示例可以找到PyZmq Tornado Event Loop)【参考方案2】:

在这个答案中我列出了

来自文档http://api.zeromq.org/4-1:zmq-poll的详细信息

此外,我还添加了一些重要的解释以及为新用户清除困惑的内容!如果你赶时间!您可能希望从轮询器做什么以及如何接收关于接收和仅一个套接字的说明部分开始!从重要说明部分开始!我在哪里深入了解事情!我仍然建议仔细阅读文档参考中的详细信息!还有第一节!

文档参考和注释

监听多个套接字和事件

zmq_poll() 函数为应用程序提供了一种机制,可以在一组套接字上以电平触发的方式多路复用输入/输出事件。 items 参数指向的数组的每个成员都是一个 zmq_pollitem_t 结构。 nitems 参数指定 items 数组中的项目数。 zmq_pollitem_t结构定义如下:

typedef struct

    void //*socket//;
    int //fd//;
    short //events//;
    short //revents//;
 zmq_pollitem_t;

zmq Socket或标准socket通过fd

对于每个 zmq_pollitem_t 项目,zmq_poll() 应检查 ØMQ 套接字 套接字引用指定的标准套接字 文件描述符 fd,用于事件中指定的事件。 如果socket和fd都设置在一个zmq_pollitem_t中,则socket引用的ØMQ socket优先,fd的值会被忽略。

大笔记(相同的上下文):

传递给 zmq_poll() 函数的所有 ØMQ 套接字必须共享相同的 ØMQ 上下文,并且必须属于调用 zmq_poll() 的线程。

Reevents 成员

对于每一个zmq_pollitem_t项目,zmq_poll()首先要清除revents成员,然后通过设置revents成员中事件条件对应的位来指示任何请求的事件已经发生。

成功完成后,zmq_poll() 函数应返回 zmq_pollitem_t 结构的数量,其中包含在 revents 中发出的事件,如果没有发出任何事件,则返回 0。

等待事件和阻塞

如果没有请求的事件发生 任何zmq_pollitem_t 项目zmq_poll()等待超时微秒,以便在任何请求的项目上发生事件。如果timeout 的值为0zmq_poll() 将立即返回。如果 timeout 的值为 -1,zmq_poll() 将 无限期阻塞 直到请求的事件发生 在至少一个 zmq_pollitem_t 上。 超时分辨率为1毫秒

0 => 不等待

-1 => 块

+val => 阻塞等待超时量

活动

zmq_pollitem_t 的 events 和 revents 成员是通过 OR'ing 以下事件标志的组合构造的位掩码:

ZMQ_POLLIN

对于 ØMQ 套接字,至少可以从套接字接收一条消息而不会阻塞。对于标准套接字,这相当于 poll() 系统调用的 POLLIN 标志,通常意味着可以从 fd 读取至少一个字节的数据而不会阻塞。

ZMQ_POLLOUT

对于ØMQ套接字,至少有一条消息可以不阻塞地发送到套接字。对于标准套接字,这相当于 poll() 系统调用的 POLLOUT 标志,通常意味着至少一个字节的数据可以写入 fd 而不会阻塞。

ZMQ_POLLERR

对于标准套接字,此标志通过 zmq_poll() 传递给底层的 poll() 系统调用,通常意味着 fd 指定的套接字上存在某种错误情况。对于 ØMQ 套接字,如果在事件中设置此标志则无效,并且不会被 zmq_poll() 在 revents 中返回。

注意:

zmq_poll() 函数可以使用 poll() 以外的操作系统接口实现或模拟,因此可能会以本文档中未定义的方式受到这些接口的限制。

返回值

成功完成后,zmq_poll() 函数应返回 zmq_pollitem_t 结构的数量,其中包含在 revents 中发出的事件,如果没有发出任何事件,则返回 0。失败时,zmq_poll() 将返回 -1 并将 errno 设置为下面定义的值之一。

示例

无限期地轮询 0mq 套接字和标准套接字上的输入事件。

zmq_pollitem_t items [2];
/* First item refers to ØMQ socket 'socket' */
items[0].socket = socket;
items[0].events = ZMQ_POLLIN;
/* Second item refers to standard socket 'fd' */
items[1].socket = NULL;
items[1].fd = fd;
items[1].events = ZMQ_POLLIN;
/* Poll for events indefinitely */
int rc = zmq_poll (items, 2, -1);
assert (rc >= 0); /* Returned events will be stored in items[].revents */

重要提示

轮询器做什么以及接收什么

轮询器只检查和等待事件何时发生! POLLIN 用于接收!有数据可以接收! 然后我们应该通读recv()!我们有责任阅读或做任何事情!轮询器只是在那里收听事件并等待它们!并且通过zmq_pollitem_t我们可以监听多个事件!如果发生任何事件!然后轮询器解除阻塞!然后我们可以在recv中检查事件!和 zmq_pollitem_t!请注意,轮询器在事件触发时将它们排队!下一个电话将从队列中挑选!因此,订单也被保留了!并且连续调用将返回下一个事件,依此类推!当他们进来时!

关于接收的说明以及仅一个套接字的情况

对于路由器!一个路由器甚至可以从一个客户端接收多个请求!并且同时来自多个客户!在多个客户端具有相同性质的设置中!并且是连接到路由器的那些!一个新人可能会想到的问题是!对于这种异步性质,我是否需要轮询器!答案是否定的!无需轮询器和侦听不同的套接字!

重要的是:接收调用(zmq_recv()、socket.recv() 一些 lang 绑定)!堵塞!并且是阅读的方式!消息来的时候!他们在排队!轮询器与此无关!轮询器只监听来自不同套接字的事件!如果其中任何一个发生,请取消阻止!如果达到超时,则不会发生任何事件!仅此而已!

接收的性质是直截了当的!接收呼叫阻塞!直到消息队列中的一条消息到来!当多人来时,他们将排队!然后在每次下一次调用 recv() 时!我们将拉下一条消息!或者框架! (取决于我们使用的接收方法!和 api 级别!以及从绑定库到低级别的抽象!) 因为我们也可以按帧访问消息!每次通话一帧! 但到了这里就清楚了! 接电话就是要接的东西!他们阻止直到消息进入队列!多个并行消息!他们来了就会排队!然后每次通话!队列是否已满!消费它!或者等等! 这是一件非常重要的事情!这会让新人感到困惑!

只有当有多个套接字时才需要轮询器!它们总是我们在相关进程代码上声明的套接字(绑定它们,或连接到某个东西)!因为如果没有!你将如何收到消息!你不能做好!因为你必须优先考虑一个或另一个!在一个有一个 recv() 的循环中先行!哪个会阻塞!即使另一个套接字在它的队列中收到一条消息!循环被阻塞,无法进行下一个recv()!因为轮询器给了我们能够解决这个问题的美丽!并且适用于多个套接字!

while(true) 
    socket1.recv() // this will block

    socket2.recv() // this will have to wait till the first recieve! Even if messages come in in it's queue

使用轮询器:

While(true) 
    zmq_poll() // block till one of the socket events happen! If the event was POLLIN!
            // If any socket get a message to it's queue
            // This will unblock
    // then we check which type and which socket was
    if (condition socket 1) 
        // treat socket 1 request
    

    if (condition socket 2) 
        // treat socket 2 request
    
 
    // ...

您可以从this section 的文档中查看真实代码(滚动到足以查看代码块,您也可以看到所有不同的语言)

知道轮询器只是通知有消息!如果是POLLIN!

每次迭代中!如果已经触发了许多事件,则轮询器! Let's give the example of 10 messages recieved 5 in each socket轮询器已经将事件排入队列!并且在每个下一个电话中进行了 9 次!将立即解决!有问题的消息可以映射到哪个套接字(通过使用轮询器对象和意思!所以绑定库使它变得过于简单和令人愉快)!然后正确的套接字块将发出接收调用!当它执行时它将消耗它队列中的下一条消息

所以你继续循环并且每次消耗下一条消息他们进来时轮询器跟踪了那里的进入顺序通过订阅和选择收听的事件接收情况,应该是POLLIN

那么每个套接字都有它的消息队列!并且每次接听电话摆脱它民意调查者跟踪了他们!所以当轮询器解决时确保有消息通知套接字接收呼叫

最后一个例子:服务器客户端模式

让我们以一台服务器(路由器)和许多连接到的客户端(经销商)为例!如下图所示!

问题:多个连接到同一个路由器!立即异步发送!布拉布拉布拉!在服务器端(路由器)! 我需要投票器吗!? 很多新人,可能会认为是或质疑是否需要!是的,你猜对了!

大不了!

为什么?因为在服务器(路由器)代码中!我们只有一个正在处理的套接字!我们绑定!客户端然后连接到它!到此为止!只有一个插座!并且所有 recv() 调用都在那个套接字上!那个套接字有它的消息队列! recv() 一个接一个地消费消息!异步无关紧要,它们是如何产生的!同样,轮询器仅在有多个套接字时才有效!因此具有处理来自多个套接字的消息的混合性质!如果不!然后一个套接字的一个 recv() 需要先去然后另一个!并且会挡住对方!不是好事(坏事)!

注意

这个答案带来了一个很好的清理!此外,它还引用了具有良好突出显示的文档!还显示低级库(c lang)的代码! @rafflan 的 answer 展示了一个带有绑定库的很棒的代码(似乎是 c#)!和一个很好的解释!如果你没有检查,你必须!

【讨论】:

以上是关于zmq 轮询器是如何工作的?的主要内容,如果未能解决你的问题,请参考以下文章

Spring集成多轮询器

Sprint 集成 DSL - Http 入站适配器和轮询器

Cacti优化之spine轮询器

是否可以使轮询器(或 PollableMessageSource)将消息作为列表轮询?

没有为通道适配器定义轮询器

AngularJs轮询器写法