zeromq:如何防止无限等待?
Posted
技术标签:
【中文标题】zeromq:如何防止无限等待?【英文标题】:zeromq: how to prevent infinite wait? 【发布时间】:2011-11-24 05:40:47 【问题描述】:我刚开始使用 ZMQ。我正在设计一个应用程序,其工作流程是:
-
众多客户端之一(具有随机 PULL 地址)向 5555 处的服务器推送请求
服务器一直在等待客户端推送。当一个请求出现时,会为该特定请求生成一个工作进程。是的,工作进程可以同时存在。
当该进程完成其任务时,它会将结果推送给客户端。
我假设 PUSH/PULL 架构适合此。请对此纠正我。
但是我该如何处理这些情况呢?
-
当服务器无法响应时,client_receiver.recv() 将无限等待。
客户端可能会发送请求,但它会立即失败,因此工作进程将永远停留在 server_sender.send()。
那么如何在 PUSH/PULL 模型中设置超时?
编辑:感谢 user938949 的建议,我得到了一个有效的答案,我将其分享给后代。
【问题讨论】:
我不是 0mq 专家,但在很多这样的情况下,最好在启动时创建工作池,而不是创建工作人员来响应消息。可能我误会你了。 好点。我实际上计划预先分叉工人。我刚刚意识到使用 0mq 可以变得微不足道。 【参考方案1】:如果您使用 zeromq >= 3.0,那么您可以设置 RCVTIMEO 套接字选项:
client_receiver.RCVTIMEO = 1000 # in milliseconds
但一般来说,您可以使用轮询器:
poller = zmq.Poller()
poller.register(client_receiver, zmq.POLLIN) # POLLIN for recv, POLLOUT for send
而poller.poll()
需要超时:
evts = poller.poll(1000) # wait *up to* one second for a message to arrive.
如果没有可接收的内容,evts
将是一个空列表。
您可以使用zmq.POLLOUT
进行轮询,以检查发送是否成功。
或者,为了处理可能失败的对等点的情况,a:
worker.send(msg, zmq.NOBLOCK)
可能就足够了,它总是会立即返回 - 如果发送无法完成,则会引发 ZMQError(zmq.EAGAIN)。
【讨论】:
你能详细说明一下 zmq.NOBLOCK 吗? 嗨,我们每次断开连接并重新连接时都必须重新注册一个套接字(在轮询器中)吗? 不行,只有关闭socket再打开一个新的才需要重新注册。 正如@Adobri 和@mknaf 所说:如果使用zmq.RCVTIMEO
,您还需要设置zmq.LINGER
,否则即使超时,套接字仍然不会关闭。在 Python 中,它是 socket.setsockopt(zmq.RCVTIMEO, 1000)
socket.setsockopt(zmq.LINGER, 0)
message = socket.recv()
这两行都在 python 中工作:results_receiver.RCVTIMEO = 1000
和 results_receiver.setsockopt(zmq.RCVTIMEO, 1000)
【参考方案2】:
这是我在参考 user938949 的答案和 http://taotetek.wordpress.com/2011/02/02/python-multiprocessing-with-zeromq/ 之后做出的快速破解。如果你做得更好,请发表你的答案,我会推荐你的答案。
对于那些想要持久解决方案可靠性的人,请参考http://zguide.zeromq.org/page:all#toc64
zeromq 3.0 版(beta ATM)在 ZMQ_RCVTIMEO 和 ZMQ_SNDTIMEO 中支持 超时。 http://api.zeromq.org/3-0:zmq-setsockopt
服务器
zmq.NOBLOCK 确保当客户端不存在时,send() 不会阻塞。
import time
import zmq
context = zmq.Context()
ventilator_send = context.socket(zmq.PUSH)
ventilator_send.bind("tcp://127.0.0.1:5557")
i=0
while True:
i=i+1
time.sleep(0.5)
print ">>sending message ",i
try:
ventilator_send.send(repr(i),zmq.NOBLOCK)
print " succeed"
except:
print " failed"
客户
轮询器对象可以侦听许多接收套接字(请参阅上面链接的“Python Multiprocessing with ZeroMQ”。我仅将它链接到 work_receiver。在无限循环中,客户端以间隔轮询1000 毫秒。如果在那段时间内没有收到任何消息,socks 对象返回空。
import time
import zmq
context = zmq.Context()
work_receiver = context.socket(zmq.PULL)
work_receiver.connect("tcp://127.0.0.1:5557")
poller = zmq.Poller()
poller.register(work_receiver, zmq.POLLIN)
# Loop and accept messages from both channels, acting accordingly
while True:
socks = dict(poller.poll(1000))
if socks:
if socks.get(work_receiver) == zmq.POLLIN:
print "got message ",work_receiver.recv(zmq.NOBLOCK)
else:
print "error: message timeout"
【讨论】:
Python 有select
吗? Ruby 的库有一个类似IO.select
的库。您可以:if ZMQ.select([read_socket], nil, nil, 20); puts read_socket.recv; else; puts 'timeout 20 secs'; end
【参考方案3】:
如果你使用 ZMQ_NOBLOCK,发送不会阻塞,但是如果你尝试关闭套接字和上下文,这一步会阻止程序退出..
原因是套接字等待任何对等方,以确保传出消息排队。要立即关闭套接字并从缓冲区中刷新传出消息,请使用 ZMQ_LINGER 并将其设置为 0..
【讨论】:
zmq.RCVTIMEO 如果您不使用 zmq.LINGER 将无济于事,因为超时后套接字仍然不会关闭。这应该添加到所选答案中。【参考方案4】:如果您只等待一个套接字,而不是创建Poller
,您可以这样做:
if work_receiver.poll(1000, zmq.POLLIN):
print "got message ",work_receiver.recv(zmq.NOBLOCK)
else:
print "error: message timeout"
如果你的超时时间根据情况发生变化,你可以使用它,而不是设置work_receiver.RCVTIMEO
。
【讨论】:
不错的答案:)以上是关于zeromq:如何防止无限等待?的主要内容,如果未能解决你的问题,请参考以下文章
React 限制渲染次数以防止无限循环...重新渲染次数过多
为啥错误:重新渲染太多。 React 限制了渲染的数量以防止无限循环。?