zeromq:如何防止无限等待?

Posted

技术标签:

【中文标题】zeromq:如何防止无限等待?【英文标题】:zeromq: how to prevent infinite wait? 【发布时间】:2011-11-24 05:40:47 【问题描述】:

我刚开始使用 ZMQ。我正在设计一个应用程序,其工作流程是:

    众多客户端之一(具有随机 PULL 地址)向 5555 处的服务器推送请求 服务器一直在等待客户端推送。当一个请求出现时,会为该特定请求生成一个工作进程。是的,工作进程可以同时存在。 当该进程完成其任务时,它会将结果推送给客户端。

我假设 PUSH/PULL 架构适合此。请对此纠正我


但是我该如何处理这些情况呢?

    当服务器无法响应时,client_receiver.recv() 将无限等待。 客户端可能会发送请求,但它会立即失败,因此工作进程将永远停留在 server_sender.send()。

那么如何在 PUSH/PULL 模型中设置超时


编辑:感谢 user938949 的建议,我得到了一个有效的答案,我将其分享给后代。

【问题讨论】:

我不是 0mq 专家,但在很多这样的情况下,最好在启动时创建工作池,而不是创建工作人员来响应消息。可能我误会你了。 好点。我实际上计划预先分叉工人。我刚刚意识到使用 0mq 可以变得微不足道。 【参考方案1】:

如果您使用 zeromq >= 3.0,那么您可以设置 RCVTIMEO 套接字选项:

client_receiver.RCVTIMEO = 1000 # in milliseconds

但一般来说,您可以使用轮询器:

poller = zmq.Poller()
poller.register(client_receiver, zmq.POLLIN) # POLLIN for recv, POLLOUT for send

poller.poll() 需要超时:

evts = poller.poll(1000) # wait *up to* one second for a message to arrive.

如果没有可接收的内容,evts 将是一个空列表。

您可以使用zmq.POLLOUT 进行轮询,以检查发送是否成功。

或者,为了处理可能失败的对等点的情况,a:

worker.send(msg, zmq.NOBLOCK)

可能就足够了,它总是会立即返回 - 如果发送无法完成,则会引发 ZMQError(zmq.EAGAIN)。

【讨论】:

你能详细说明一下 zmq.NOBLOCK 吗? 嗨,我们每次断开连接并重新连接时都必须重新注册一个套接字(在轮询器中)吗? 不行,只有关闭socket再打开一个新的才需要重新注册。 正如@Adobri 和@mknaf 所说:如果使用zmq.RCVTIMEO,您还需要设置zmq.LINGER,否则即使超时,套接字仍然不会关闭。在 Python 中,它是 socket.setsockopt(zmq.RCVTIMEO, 1000) socket.setsockopt(zmq.LINGER, 0) message = socket.recv() 这两行都在 python 中工作:results_receiver.RCVTIMEO = 1000results_receiver.setsockopt(zmq.RCVTIMEO, 1000)【参考方案2】:

这是我在参考 user938949 的答案和 http://taotetek.wordpress.com/2011/02/02/python-multiprocessing-with-zeromq/ 之后做出的快速破解。如果你做得更好,请发表你的答案,我会推荐你​​的答案

对于那些想要持久解决方案可靠性的人,请参考http://zguide.zeromq.org/page:all#toc64

zeromq 3.0 版(beta ATM)在 ZMQ_RCVTIMEO 和 ZMQ_SNDTIMEO 中支持 超时。 http://api.zeromq.org/3-0:zmq-setsockopt

服务器

zmq.NOBLOCK 确保当客户端不存在时,send() 不会阻塞。

import time
import zmq
context = zmq.Context()

ventilator_send = context.socket(zmq.PUSH)
ventilator_send.bind("tcp://127.0.0.1:5557")

i=0

while True:
    i=i+1
    time.sleep(0.5)
    print ">>sending message ",i
    try:
        ventilator_send.send(repr(i),zmq.NOBLOCK)
        print "  succeed"
    except:
        print "  failed"

客户

轮询器对象可以侦听许多接收套接字(请参阅上面链接的“Python Multiprocessing with ZeroMQ”。我仅将它链接到 work_receiver。在无限循环中,客户端以间隔轮询1000 毫秒。如果在那段时间内没有收到任何消息,socks 对象返回空。

import time
import zmq
context = zmq.Context()

work_receiver = context.socket(zmq.PULL)
work_receiver.connect("tcp://127.0.0.1:5557")

poller = zmq.Poller()
poller.register(work_receiver, zmq.POLLIN)

# Loop and accept messages from both channels, acting accordingly
while True:
    socks = dict(poller.poll(1000))
    if socks:
        if socks.get(work_receiver) == zmq.POLLIN:
            print "got message ",work_receiver.recv(zmq.NOBLOCK)
    else:
        print "error: message timeout"

【讨论】:

Python 有select 吗? Ruby 的库有一个类似IO.select 的库。您可以:if ZMQ.select([read_socket], nil, nil, 20); puts read_socket.recv; else; puts 'timeout 20 secs'; end【参考方案3】:

如果你使用 ZMQ_NOBLOCK,发送不会阻塞,但是如果你尝试关闭套接字和上下文,这一步会阻止程序退出..

原因是套接字等待任何对等方,以确保传出消息排队。要立即关闭套接字并从缓冲区中刷新传出消息,请使用 ZMQ_LINGER 并将其设置为 0..

【讨论】:

zmq.RCVTIMEO 如果您不使用 zmq.LINGER 将无济于事,因为超时后套接字仍然不会关闭。这应该添加到所选答案中。【参考方案4】:

如果您只等待一个套接字,而不是创建Poller,您可以这样做:

if work_receiver.poll(1000, zmq.POLLIN):
    print "got message ",work_receiver.recv(zmq.NOBLOCK)
else:
    print "error: message timeout"

如果你的超时时间根据情况发生变化,你可以使用它,而不是设置work_receiver.RCVTIMEO

【讨论】:

不错的答案:)

以上是关于zeromq:如何防止无限等待?的主要内容,如果未能解决你的问题,请参考以下文章

React 限制渲染次数以防止无限循环...重新渲染次数过多

如果启用了无限滚动,如何防止剑道网格两次加载数据?

如何防止'GMSMapView'无限水平滚动?

为啥错误:重新渲染太多。 React 限制了渲染的数量以防止无限循环。?

componentDidUpdate 错误是 React 限制嵌套更新的数量以防止无限循环如何解决此问题

防止重复无限滚动ajax加载器