非阻塞 multiprocessing.connection.Listener?

Posted

技术标签:

【中文标题】非阻塞 multiprocessing.connection.Listener?【英文标题】:Non-blocking multiprocessing.connection.Listener? 【发布时间】:2016-12-20 13:53:01 【问题描述】:

我使用 multiprocessing.connection.Listener 进行进程之间的通信,它对我来说是一种魅力。现在我真的很喜欢我的主循环在来自客户端的命令之间做其他事情。不幸的是 listener.accept() 会阻塞执行,直到与客户端进程建立连接。

有没有一种简单的方法来管理 multiprocessing.connection 的非阻塞检查?暂停?还是我应该使用专用线程?

    # Simplified code:

    from multiprocessing.connection import Listener

    def mainloop():
        listener = Listener(address=(localhost, 6000), authkey=b'secret')

        while True:
            conn = listener.accept() # <---  This blocks!
            msg = conn.recv() 
            print ('got message: %r' % msg)
            conn.close()

【问题讨论】:

【参考方案1】:

我发现的一个解决方案(虽然它可能不是最“优雅”的解决方案是使用conn.poll。(documentation)如果侦听器有新数据,并且(最重要的是)是非阻塞的,则轮询返回True如果没有参数传递给它。我不是 100% 确定这是最好的方法,但我只运行一次 listener.accept() 就成功了,然后使用以下语法重复获取输入(如果有的话)

from multiprocessing.connection import Listener

def mainloop():
    running = True

    listener = Listener(address=(localhost, 6000), authkey=b'secret')
    conn = listener.accept()
    msg = ""

    while running:
        while conn.poll():
            msg = conn.recv() 
            print (f"got message: msg")

            if msg == "EXIT":
                running = False

        # Other code can go here
        print(f"I can run too! Last msg received was msg")

     conn.close()

条件语句中的“while”可以替换为“if”,如果您只想一次最多获取一条消息。谨慎使用,因为它看起来有点“hacky”,我还没有在其他地方找到使用 conn.poll 的参考资料。

【讨论】:

你试过了吗?现在靠谱吗? 是的,它通常非常可靠,尽管我绝不是多处理系统方面的专家。它在我拥有的项目上一直没有问题,所以是的,我会说它对我来说一直可靠地工作。话虽如此,这是一个快速的解决方案,但可能还有更优雅的解决方案。【参考方案2】:

我自己没有使用过 Listener 对象——对于这个任务我通常使用multiprocessing.Queue; doco 在以下链接:

https://docs.python.org/2/library/queue.html#Queue.Queue

该对象可用于通过一个不错的 API 在 Python 进程之间发送和接收任何可腌制对象;我想你最感兴趣的是:

在进程 A .put('some message') 在进程 B .get_nowait() # will raise Queue.Empty if nothing is available- handle that to move on with your execution

唯一的限制是您需要在某个时候控制两个 Process 对象,以便能够将队列分配给它们 - 如下所示:

import time
from Queue import Empty
from multiprocessing import Queue, Process


def receiver(q):
    while 1:
        try:
            message = q.get_nowait()
            print 'receiver got', message
        except Empty:
            print 'nothing to receive, sleeping'
            time.sleep(1)


def sender(q):
    while 1:
        message = 'some message'
        q.put('some message')
        print 'sender sent', message
        time.sleep(1)


some_queue = Queue()

process_a = Process(
    target=receiver,
    args=(some_queue,)
)

process_b = Process(
    target=sender,
    args=(some_queue,)
)

process_a.start()
process_b.start()

print 'ctrl + c to exit'
try:
    while 1:
        time.sleep(1)
except KeyboardInterrupt:
    pass

process_a.terminate()
process_b.terminate()

process_a.join()
process_b.join()

队列很好,因为您实际上可以为同一个 Queue 对象拥有任意数量的消费者和生产者(便于分配任务)。

我应该指出,仅在进程上调用 .terminate() 是不好的形式 - 您应该使用闪亮的新消息传递系统来传递关闭消息或类似性质的东西。

【讨论】:

要详细说明“您需要控制两个 Process 对象”,如果这是为了允许两个进程在不同时间启动或者可能完全存在于不同的服务器上,那么我恐怕我的建议没用 - 我想你需要一个线程或其他东西,为此我通常使用 gRPC。【参考方案3】:

多处理模块带有一个很好的特性,叫做 Pipe()。这是在两个进程之间共享资源的好方法(以前从未尝试过两个以上)。随着 python 3.80 的到来,多处理模块中的共享内存功能出现了,但我还没有真正测试过,所以我不能保证它 您将使用类似于

的管道功能
from multiprocessing import Pipe

.....

def sending(conn):
    message = 'some message'
    #perform some code
    conn.send(message)
    conn.close()

receiver, sender = Pipe()
p = Process(target=sending, args=(sender,))
p.start()
print receiver.recv()   # prints "some message"
p.join()

有了这个,您应该能够让单独的进程独立运行,并且当您到达需要来自一个进程的输入的地步时。如果由于其他进程的未释放数据而出现某种错误,您可以将其置于某种睡眠或暂停状态,或者使用 while 循环在其他进程完成该任务并将其发送过来时不断检查挂起

while not parent_conn.recv():
    time.sleep(5)

这应该让它处于无限循环中,直到另一个进程完成运行并发送结果。这也比队列快大约 2-3 倍。虽然队列也是一个不错的选择我个人不使用它。

【讨论】:

【参考方案4】:

你可以在线程中运行阻塞函数:

conn = await loop.run_in_executor(None, listener.accept)

【讨论】:

以上是关于非阻塞 multiprocessing.connection.Listener?的主要内容,如果未能解决你的问题,请参考以下文章

verilog 阻塞和非阻塞啥区别啊?

非阻塞socket总结

同步阻塞同步非阻塞异步阻塞异步非阻塞--简明介绍

并行,并发,串行,同步,异步,阻塞,非阻塞,同步阻塞,同步非阻塞,异步阻塞,异步非阻塞

verilog中为啥非阻塞赋值要用绝对时延

同步异步、阻塞非阻塞、Netty