如何在 ZeroMQ 中以正确的方式中止 context.socket.recv()?

Posted

技术标签:

【中文标题】如何在 ZeroMQ 中以正确的方式中止 context.socket.recv()?【英文标题】:How to abort context.socket.recv() the right way in ZeroMQ? 【发布时间】:2015-06-04 22:41:15 【问题描述】:

我有一个小软件,其中有一个单独的线程正在等待 ZeroMQ 消息。我使用的是ZeroMQ的PUB/SUB通信协议。

目前我正在通过将变量“cont_loop”设置为False来中止该线程。

但我发现,当没有消息到达 ZeroMQ 订阅者时,我无法退出线程(不关闭整个程序)。

def __init__(self):
    Thread.__init__(self)
    self.cont_loop = True

def abort(self):
    self.continue_loop = False

def run(self):
    zmq_context = zmq.Context()
    zmq_socket = zmq_context.socket(zmq.SUB)
    zmq_socket.bind("tcp://*:%s" % *(5556))
    zmq_socket.setsockopt(zmq.SUBSCRIBE, "")
    while self.cont_loop:
        data = zmq_socket.recv()
        print "Message: " + data
    zmq_socket.close()
    zmq_context.term()
    print "exit"

我试图将socket.close()context.term() 移动到中止方法。这样它就会关闭订阅者,但这会杀死整个程序。

关闭上述程序的正确方法是什么?

【问题讨论】:

【参考方案1】:

问:正确的方法是什么?

答:有很多方法可以实现既定目标。让我只选择一个,作为如何处理分布式进程到进程消息传递的模型示例。

首先。假设在典型的软件设计任务中有更多的优先级。有些更高,有些更低,有些甚至如此之低,以至于可以推迟执行这些低优先级的子任务,以便调度程序中有更多时间来执行那些无法处理等待的子任务。

这就是说,让我们查看您的代码。 SUB-side 对 .recv() 的指令被使用,导致两件事。一个可见 - 它使用 SUB-behaviour 在 ZeroMQ 套接字上执行 RECEIVE 操作。第二个不太明显的是,它一直挂起,直到它获得与SUB-behaviour 的当前状态“兼容”的东西(稍后将详细介绍设置)。

这意味着,它也一直阻塞,因为这样的.recv() 方法调用UNTIL 一些未知的、本地无法控制的状态/事件的巧合使其能够传递 ZeroMQ -message,其内容是 “兼容” 的本地预设状态(仍然阻塞)SUB-behaviour 实例。

这可能需要很长时间。

这正是.recv() 在控制循环中使用的原因,在控制循环中,外部处理既有机会又有责任去做你想做的事(包括与中止相关的操作和公平/优雅的终止与适当的资源' 释放)。

接收过程变为.recv( flags = zmq.NOBLOCK ),而不是try: except: 插曲。这样,您的本地进程就不会失去对事件流的控制(包括 NOP 就是这样)。

最好的下一步?

花点时间阅读一本精彩的书籍,“Code Connected, Volume 1”,ZeroMQ 的共同之父 Pieter HINTJENS 已出版(也称为 PDF)。

他与我们分享的许多要避免的想法和错误确实值得您花时间。

享受 ZeroMQ 的强大功能。它非常强大,值得自上而下掌握。

【讨论】:

非常感谢您的回答!信息量很大。

以上是关于如何在 ZeroMQ 中以正确的方式中止 context.socket.recv()?的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Dart 中以正确的方式重定向和重新加载?

在视图中以编程方式添加图像

如何使用 ZeroMQ 从 C# 客户端向 C++ 服务器发送消息

如何正确处理 ZeroMQ.js 中的连接超时?

是否可以在浏览器中以编程方式捕获页面上的所有事件?

zeromq使用模式实验总结