跨多进程共享基于异步等待协程的复杂对象

Posted

技术标签:

【中文标题】跨多进程共享基于异步等待协程的复杂对象【英文标题】:Share async-await coroutine based complex object across multiprocess 【发布时间】:2018-02-01 04:37:54 【问题描述】:

我一般都知道,对象不应该在多进程之间共享以及由此产生的问题。但我的要求是必须这样做。

我有一个复杂的对象,其中包含所有不错的协程 async-await。 在它自己的单独进程中在此对象上运行长时间运行的进程的函数。现在,我想在主进程中运行一个 IPython shell,并在这个长时间运行的进程在另一个进程中运行时对这个复杂的对象进行操作。

为了跨进程共享这个复杂的对象,我尝试了我在 SO 上遇到的多处理 BaseManager 方法:

import multiprocessing
import multiprocessing.managers as m


class MyManager(m.BaseManager):
    pass

MyManager.register('complex_asynio_based_class', complex_asynio_based_class)
manager = MyManager()
manager.start()
c = manager.complex_asynio_based_class()

process = multiprocessing.Process(
     target=long_running_process,
     args=(c,),
)

但这给出了错误:

Unserializable message: Traceback (most recent call last):
  File "/usr/3.6/lib/python3.6/multiprocessing/managers.py", line 283, in serve_client
    send(msg)
  File "/usr/3.6/lib/python3.6/multiprocessing/connection.py", line 206, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "/usr/3.6/lib/python3.6/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
TypeError: can't pickle coroutine objects

它不起作用,因为对象中有协程。我想不出更好的解决方案来让它发挥作用,我一直坚持下去。

如果不是 Python,我会为长时间运行的进程生成一个线程,并且仍然可以对其进行操作。

如果我没记错的话,这应该是多进程应用程序运行后台进程的常见模式,而主进程只对其进行一些只读操作,就像我的情况一样,而不是修改它。我想知道一般是怎么做的?

无法选择的复杂对象如何在多进程之间共享?

【问题讨论】:

我问错了吗? Downvoters 我会要求也发表评论说什么是错的。 你不能共享协程,因为只有一个事件循环,并且它在拥有对象的进程中。您可以做的是使用诸如“开始执行此操作并返回令牌”和“检查令牌 表示的操作是否已完成”等方法在您的异步类上创建一个同步外观。您可以调用这些操作其他过程没有任何问题。桥接异步操作以便它们在 所有 进程(每个进程都有自己的事件循环)中是异步的是一个挑战,但如果你提供你的类的一个最小示例,它也可能是可能的。 如果不是 Python,我会为长时间运行的进程生成一个线程并且仍然可以对其进行操作。 为什么不要你产生一个线程?为什么你的用例不能接受线程(可能是由于 GIL),但是显式单线程 asyncio 可以满足它? @user4815162342 谢谢。是的,不跨进程共享协程是有意义的。 你为什么不产生一个线程?我的要求是项目中有多个组件,每个部分都在他们的进程中工作,而其他进程大多只是读取它。 这很清楚,但同样适用于 Python 以外的语言。 【参考方案1】:

正在运行的协程不能在进程之间自动共享,因为协程在拥有异步类的进程中的特定事件循环内运行。协程具有无法腌制的状态,即使可以腌制,在其事件循环的上下文之外也没有任何意义。

您可以做的是为您的异步类创建一个基于回调的适配器,每个协程方法都由一个基于回调的方法表示,其语义为“开始执行 X 并在完成时调用此函数”。如果回调是多处理感知的,则可以从其他进程调用这些操作。然后,您可以在 each 进程中启动一个事件循环,并在代理的基于回调的调用上创建一个协程外观。

例如,考虑一个简单的异步类:

class Async:
    async def repeat(self, n, s):
        for i in range(n):
            print(s, i, os.getpid())
            await asyncio.sleep(.2)
        return s

基于回调的适配器可以使用公共asyncio API 将repeat 协程转换为javascript“回调地狱”风格的经典异步函数:

class CallbackAdapter:
    def repeat_start(self, n, s, on_success):
        fut = asyncio.run_coroutine_threadsafe(
            self._async.repeat(n, s), self._loop)
        # Once the coroutine is done, notify the caller.
        fut.add_done_callback(lambda _f: on_success(fut.result()))

(可以自动转换,上面手动写的代码只是说明概念。)

CallbackAdapter 可以注册到多处理,因此不同的进程可以通过多处理提供的代理启动适配器的方法(以及原始异步协程)。这只要求作为on_success 传递的回调是多处理友好的。

作为最后一步,可以为基于回调的 API (!) 创建一个异步适配器,在另一个进程中启动一个事件循环,并使用 asyncio 和async def。这个adapter-for-adapter类将运行一个功能齐全的repeat协程,它有效地代理原始Async.repeat协程,而无需尝试腌制协程状态。

这是上述方法的示例实现:

import asyncio, multiprocessing.managers, threading, os

class Async:
    # The async class we are bridging.  This class is unaware of multiprocessing
    # or of any of the code that follows.
    async def repeat(self, n, s):
        for i in range(n):
            print(s, i, 'pid', os.getpid())
            await asyncio.sleep(.2)
        return s


def start_asyncio_thread():
    # Since the manager controls the main thread, we have to spin up the event
    # loop in a dedicated thread and use asyncio.run_coroutine_threadsafe to
    # submit stuff to the loop.
    setup_done = threading.Event()
    loop = None
    def loop_thread():
        nonlocal loop
        loop = asyncio.new_event_loop()
        asyncio.set_event_loop(loop)
        setup_done.set()
        loop.run_forever()
    threading.Thread(target=loop_thread).start()
    setup_done.wait()
    return loop

class CallbackAdapter:
    _loop = None

    # the callback adapter to the async class, also running in the
    # worker process
    def __init__(self, obj):
        self._async = obj
        if CallbackAdapter._loop is None:
            CallbackAdapter._loop = start_asyncio_thread()

    def repeat_start(self, n, s, on_success):
        # Submit a coroutine to the event loop and obtain a Task/Future.  This
        # is normally done with loop.create_task, but repeat_start will be
        # called from the main thread, owned by the multiprocessng manager,
        # while the event loop will run in a separate thread.
        future = asyncio.run_coroutine_threadsafe(
            self._async.repeat(n, s), self._loop)
        # Once the coroutine is done, notify the caller.
        # We could propagate exceptions by accepting an additional on_error
        # callback, and nesting fut.result() in a try/except that decides
        # whether to call on_success or on_error.
        future.add_done_callback(lambda _f: on_success(future.result()))


def remote_event_future(manager):
    # Return a function/future pair that can be used to locally monitor an
    # event in another process.
    #
    # The returned function and future have the following property: when the
    # function is invoked, possibly in another process, the future completes.
    # The function can be passed as a callback argument to a multiprocessing
    # proxy object and therefore invoked by a different process.
    loop = asyncio.get_event_loop()
    result_pipe = manager.Queue()
    future = loop.create_future()
    def _wait_for_remote():
        result = result_pipe.get()
        loop.call_soon_threadsafe(future.set_result, result)
    t = threading.Thread(target=_wait_for_remote)
    t.start()
    return result_pipe.put, future


class AsyncAdapter:
    # The async adapter for a callback-based API, e.g. the CallbackAdapter.
    # Designed to run in a different process and communicate to the callback
    # adapter via a multiprocessing proxy.
    def __init__(self, cb_proxy, manager):
        self._cb = cb_proxy
        self._manager = manager

    async def repeat(self, n, s):
        set_result, future = remote_event_future(self._manager)
        self._cb.repeat_start(n, s, set_result)
        return await future


class CommManager(multiprocessing.managers.SyncManager):
    pass

CommManager.register('Async', Async)
CommManager.register('CallbackAdapter', CallbackAdapter)


def get_manager():
    manager = CommManager()
    manager.start()
    return manager

def other_process(manager, cb_proxy):
    print('other_process (pid %d)' % os.getpid())
    aadapt = AsyncAdapter(cb_proxy, manager)
    loop = asyncio.get_event_loop()
    # Create two coroutines printing different messages, and gather their
    # results.
    results = loop.run_until_complete(asyncio.gather(
        aadapt.repeat(3, 'message A'),
        aadapt.repeat(2, 'message B')))
    print('coroutine results (pid %d): %s' % (os.getpid(), results))
    print('other_process (pid %d) done' % os.getpid())

def start_other_process(loop, manager, async_proxy):
    cb_proxy = manager.CallbackAdapter(async_proxy)
    other = multiprocessing.Process(target=other_process,
                                    args=(manager, cb_proxy,))
    other.start()
    return other

def main():
    loop = asyncio.get_event_loop()
    manager = get_manager()
    async_proxy = manager.Async()
    # Create two external processes that drive coroutines in our event loop.
    # Note that all messages are printed with the same PID.
    start_other_process(loop, manager, async_proxy)
    start_other_process(loop, manager, async_proxy)
    loop.run_forever()

if __name__ == '__main__':
    main()

代码在 Python 3.5 上正常运行,但在 3.6 和 3.7 上由于a bug in multiprocessing 而失败。

【讨论】:

这里涉及的多处理概念太多了,我花了一些时间,因为我还没有完全理解。我尝试了这种方法,但是在使用 python 3.5 时出现错误:mycodebuddy.herokuapp.com/pastepad/bFvt0Zxvo6 我注意到这是在以 forkmultiprocessing.get_context('spawn') 启动新进程时出现的错误:mycodebuddy.herokuapp.com/pastepad/MtU91UCKlI @AmitTripathi 我注意到您似乎是从 IPython 开始编写代码的。如果将其保存在以.py 结尾的文件中并以python x.py 运行它是否有效?另外,您使用的是哪个操作系统? (没关系,但在我的系统上,事件循环使用 epoll 进行轮询,而您的似乎使用 kqueue。) 将它作为 Python 文件运行仍然给我同样的错误。我正在使用 MacOs。刚刚在ubuntu系统上测试过,效果很好! @AmitTripathi 该队列仅使用一次,用于传输已调用远程回调的单个消息。这发生在原始协程完成时(它还用于传输返回值,否则我会使用 manager.Event()event.set 作为回调。)一旦发生这种情况,本地未来被标记为完成,导致协程等待它完成。 关于多处理,我现在才更好地了解它 - 在遇到您的问题之前,我什至不了解管理器。此外,如果有人想出一种更好的方法来连接异步功能,那么他们就值得拥有声誉和公认的旗帜。感谢您提出有趣的问题。【参考方案2】:

我使用多处理模块和异步模块已经有一段时间了。

您不会在进程之间共享对象。您在一个进程中创建一个对象(引用),返回一个代理对象并与其他进程共享它。其他进程使用代理对象来调用引用的方法。

在您的代码中,所指对象是 complex_asynio_based_class 实例。

这是您可以参考的愚蠢代码。主线程是一个运行 UDP 服务器和其他异步操作的单个异步循环。长时间运行的过程简单地检查循环状态。

import multiprocessing
import multiprocessing.managers as m
import asyncio 
import logging
import time 

logging.basicConfig(filename="main.log", level=logging.DEBUG) 

class MyManager(m.BaseManager):
    pass

class sinkServer(asyncio.Protocol):


    def connection_made(self, transport):
        self.transport = transport

    def datagram_received(self, data, addr):
        message = data.decode()
        logging.info('Data received: !r'.format(message))


class complex_asynio_based_class:

    def __init__(self, addr=('127.0.0.1', '8080')):
        self.loop = asyncio.new_event_loop() 
        listen = self.loop.create_datagram_endpoint(sinkServer, local_addr=addr,
                    reuse_address=True, reuse_port=True)
        self.loop.run_until_complete(listen)
        for name, delay in zip("abcdef", (1,2,3,4,5,6)):
            self.loop.run_until_complete(self.slow_op(name, delay))

    def run(self):
        self.loop.run_forever() 

    def stop(self):
        self.loop.stop() 

    def is_running(self):
        return self.loop.is_running() 

    async def slow_op(self, name, delay):
        logging.info("my name: ".format(name))
        asyncio.sleep(delay)

def long_running_process(co):
    logging.debug('address: !r'.format(co))
    logging.debug("status: ".format(co.is_running()))
    time.sleep(6)
    logging.debug("status: ".format(co.is_running()))

MyManager.register('complex_asynio_based_class', complex_asynio_based_class)
manager = MyManager()
manager.start()
c = manager.complex_asynio_based_class()

process = multiprocessing.Process(
     target=long_running_process,
     args=(c,),
)
process.start()

c.run()  #run the loop

【讨论】:

代理同步方法很清楚,但是 OP 希望代理对 async 方法的调用,例如等待co.slow_op() 来自long_running_process。这是行不通的,因为async def 的“结果”是multiprocessing 将尝试腌制的协程。 这是我使用的方法。它不适用于协程。 我明白了。我阅读了@user4815162342 提供的代码。不错。 基本思想在示例中得到了很好的阐述。 1. 长时间运行的进程等待共享值(您的示例中的队列)并将结果设置为未来。 2. 主进程设置未来回调将结果放入共享值。通过将第 2 步注册到管理器,将其包装为 shared 非异步可调用函数。 AsyncAdapter.repeat 列出了这两个步骤。

以上是关于跨多进程共享基于异步等待协程的复杂对象的主要内容,如果未能解决你的问题,请参考以下文章

简述python进程,线程和协程的区别及应用场景

Python黑魔法 --- 异步IO( asyncio) 协程

进程线程和协程的区别

Python3 异步编程之进程与线程-1

Python黑魔法 --- 异步IO( asyncio) 协程

进程线程和协程的理解-自己随笔