我可以以某种方式与子进程共享一个异步队列吗?
Posted
技术标签:
【中文标题】我可以以某种方式与子进程共享一个异步队列吗?【英文标题】:Can I somehow share an asynchronous queue with a subprocess? 【发布时间】:2014-09-01 10:46:59 【问题描述】:我想使用队列将数据从父进程传递到通过multiprocessing.Process
启动的子进程。但是,由于父进程使用 Python 的新 asyncio
库,队列方法需要是非阻塞的。据我了解,asyncio.Queue
是为任务间通信而制作的,不能用于进程间通信。另外,我知道multiprocessing.Queue
有put_nowait()
和get_nowait()
方法,但我实际上需要仍然会阻塞当前任务(但不是整个过程)的协程。有没有办法创建包装put_nowait()
/get_nowait()
的协程?另一方面,multiprocessing.Queue
使用的线程是否与在同一进程中运行的事件循环在内部兼容?
如果没有,我还有什么其他选择?我知道我可以通过使用异步套接字自己实现这样的队列,但我希望我可以避免这种情况……
编辑:
我也考虑过使用管道而不是套接字,但似乎asyncio
与multiprocessing.Pipe()
不兼容。更准确地说,Pipe()
返回一个由Connection
对象组成的元组,它们是非 类文件对象。但是,asyncio.BaseEventLoop
的方法add_reader()
/add_writer()
方法和connect_read_pipe()
/connect_write_pipe()
都期望类似文件的对象,因此不可能异步读取/写入这样的Connection
。相比之下,subprocess
包用作管道的通常类似文件的对象完全没有问题,can easily be used in combination with asyncio
。
更新:
我决定进一步探索管道方法:我通过fileno()
检索文件描述符并将其传递给os.fdopen()
,将multiprocessing.Pipe()
返回的Connection
对象转换为类文件对象。最后,我将生成的类文件对象传递给事件循环的connect_read_pipe()
/connect_write_pipe()
。 (如果有人对确切的代码感兴趣,有一些 mailing list discussion 在相关问题上。)但是,read()
ing 流给了我一个 OSError: [Errno 9] Bad file descriptor
,我没有设法解决这个问题。还考虑到missing support for Windows,我不会再追究这个问题了。
【问题讨论】:
子进程是如何启动的? 子进程是通过multiprocessing.Process
创建的。
【参考方案1】:
这是一个multiprocessing.Queue
对象的实现,它可以与asyncio
一起使用。它提供了整个multiprocessing.Queue
接口,添加了coro_get
和coro_put
方法,它们是asyncio.coroutine
s,可用于异步获取/放入队列。实现细节与我其他答案的第二个示例基本相同:ThreadPoolExecutor
用于使 get/put 异步,multiprocessing.managers.SyncManager.Queue
用于在进程之间共享队列。唯一的附加技巧是实现 __getstate__
以保持对象可拾取,尽管使用不可拾取的 ThreadPoolExecutor
作为实例变量。
from multiprocessing import Manager, cpu_count
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
def AsyncProcessQueue(maxsize=0):
m = Manager()
q = m.Queue(maxsize=maxsize)
return _ProcQueue(q)
class _ProcQueue(object):
def __init__(self, q):
self._queue = q
self._real_executor = None
self._cancelled_join = False
@property
def _executor(self):
if not self._real_executor:
self._real_executor = ThreadPoolExecutor(max_workers=cpu_count())
return self._real_executor
def __getstate__(self):
self_dict = self.__dict__
self_dict['_real_executor'] = None
return self_dict
def __getattr__(self, name):
if name in ['qsize', 'empty', 'full', 'put', 'put_nowait',
'get', 'get_nowait', 'close']:
return getattr(self._queue, name)
else:
raise AttributeError("'%s' object has no attribute '%s'" %
(self.__class__.__name__, name))
@asyncio.coroutine
def coro_put(self, item):
loop = asyncio.get_event_loop()
return (yield from loop.run_in_executor(self._executor, self.put, item))
@asyncio.coroutine
def coro_get(self):
loop = asyncio.get_event_loop()
return (yield from loop.run_in_executor(self._executor, self.get))
def cancel_join_thread(self):
self._cancelled_join = True
self._queue.cancel_join_thread()
def join_thread(self):
self._queue.join_thread()
if self._real_executor and not self._cancelled_join:
self._real_executor.shutdown()
@asyncio.coroutine
def _do_coro_proc_work(q, stuff, stuff2):
ok = stuff + stuff2
print("Passing %s to parent" % ok)
yield from q.coro_put(ok) # Non-blocking
item = q.get() # Can be used with the normal blocking API, too
print("got %s back from parent" % item)
def do_coro_proc_work(q, stuff, stuff2):
loop = asyncio.get_event_loop()
loop.run_until_complete(_do_coro_proc_work(q, stuff, stuff2))
@asyncio.coroutine
def do_work(q):
loop.run_in_executor(ProcessPoolExecutor(max_workers=1),
do_coro_proc_work, q, 1, 2)
item = yield from q.coro_get()
print("Got %s from worker" % item)
item = item + 25
q.put(item)
if __name__ == "__main__":
q = AsyncProcessQueue()
loop = asyncio.get_event_loop()
loop.run_until_complete(do_work(q))
输出:
Passing 3 to parent
Got 3 from worker
got 28 back from parent
如您所见,您可以从父进程或子进程同步和异步使用AsyncProcessQueue
。它不需要任何全局状态,并且通过将大部分复杂性封装在一个类中,使用起来比我原来的答案更优雅。
您可能能够直接使用套接字获得更好的性能,但是以跨平台的方式使其工作似乎相当棘手。这还具有可跨多个工人使用的优点,不需要您自己腌制/解封等。
【讨论】:
已接受。 :) 非常感谢您的时间和精力! @balu 没问题。非常有趣的问题!我仍然希望我们在某些时候在标准库中看到asyncio
和multiprocessing
(类似于asyncio.subprocess
)的更好集成,或者至少是asyncio.Queue
的过程安全版本,但现在这似乎是一个不错的权宜之计。
为了它的价值,我最终接受了这个想法并构建了一个完整的库,称为 aioprocessing
,它为所有 multiprocessing
类提供类似的功能。【参考方案2】:
不幸的是,multiprocessing
库并不是特别适合与asyncio
一起使用。但是,根据您计划使用multiprocessing
/multprocessing.Queue
的方式,您可以将其完全替换为concurrent.futures.ProcessPoolExecutor
:
import asyncio
from concurrent.futures import ProcessPoolExecutor
def do_proc_work(stuff, stuff2): # This runs in a separate process
return stuff + stuff2
@asyncio.coroutine
def do_work():
out = yield from loop.run_in_executor(ProcessPoolExecutor(max_workers=1),
do_proc_work, 1, 2)
print(out)
if __name__ == "__main__":
loop = asyncio.get_event_loop()
loop.run_until_complete(do_work())
输出:
3
如果您绝对需要multiprocessing.Queue
,与ProcessPoolExecutor
结合使用时,它的表现似乎还不错:
import asyncio
import time
import multiprocessing
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
def do_proc_work(q, stuff, stuff2):
ok = stuff + stuff2
time.sleep(5) # Artificial delay to show that it's running asynchronously
print("putting output in queue")
q.put(ok)
@asyncio.coroutine
def async_get(q):
""" Calls q.get() in a separate Thread.
q.get is an I/O call, so it should release the GIL.
Ideally there would be a real non-blocking I/O-based
Queue.get call that could be used as a coroutine instead
of this, but I don't think one exists.
"""
return (yield from loop.run_in_executor(ThreadPoolExecutor(max_workers=1),
q.get))
@asyncio.coroutine
def do_work(q):
loop.run_in_executor(ProcessPoolExecutor(max_workers=1),
do_proc_work, q, 1, 2)
coro = async_get(q) # You could do yield from here; I'm not just to show that it's asynchronous
print("Getting queue result asynchronously")
print((yield from coro))
if __name__ == "__main__":
m = multiprocessing.Manager()
q = m.Queue() # The queue must be inherited by our worker, it can't be explicitly passed in
loop = asyncio.get_event_loop()
loop.run_until_complete(do_work(q))
输出:
Getting queue result asynchronously
putting output in queue
3
【讨论】:
我真的需要一个队列,因为这两个进程都在无限期地运行,而且我需要一直将数据从一个进程传递到另一个进程。更具体地说,子进程将查询转发到通过队列到达的 SQLite 数据库。具有讽刺意味的是,这背后的原因是能够通过将它们放入队列并在不同的进程中执行它们来异步运行这些查询(因为 SQLite 调用是阻塞的)。无论如何,您的第二个建议看起来很有趣。不过,我觉得它更高效,考虑到全局状态,只使用套接字更优雅。 顺便问一下:你能解释一下为什么不能明确地将队列传递给工作人员吗? @balu 尝试传递multprocessing.Queue
直接引发RuntimeError: Queue objects should only be shared between processes through inheritance
。在示例代码中继承 Queue
在 Linux 上工作,但我发现它实际上挂在 Windows 上。不过,使用multprocessing.manager.Queue
(可以在进程之间显式传递)似乎适用于所有平台。我已经更新了我的答案以反映这一点。
感谢您的回答。现在我有点困惑,因为Python docs 中的代码示例明确地将multiprocessing.Queue
传递给multiprocessing.Process
。
@balu 是的,我不知道实现是否合法,但您可以将Queue
传递给Process
或Pool
的构造函数(使用initializer
/initargs
关键字参数)。但是,如果您尝试将Queue
传递给pool.apply
调用,它将引发RuntimeError
。看来,如果您在子进程实际启动之前传递队列,则它是允许的。在pool.apply
并将其传递给ProcessPoolExecutor
的情况下,子进程已经启动,因此无法传递Queue
。【参考方案3】:
aiopipe (https://pypi.org/project/aiopipe/) 看起来一针见血。
至少它帮助了我..
【讨论】:
以上是关于我可以以某种方式与子进程共享一个异步队列吗?的主要内容,如果未能解决你的问题,请参考以下文章