将 asyncio 与多处理结合起来会出现啥样的问题(如果有的话)?

Posted

技术标签:

【中文标题】将 asyncio 与多处理结合起来会出现啥样的问题(如果有的话)?【英文标题】:What kind of problems (if any) would there be combining asyncio with multiprocessing?将 asyncio 与多处理结合起来会出现什么样的问题(如果有的话)? 【发布时间】:2014-01-16 10:25:33 【问题描述】:

当他们第一次看到 Python 中的线程时,几乎每个人都知道,对于那些真正想要并行处理的人来说,GIL 让他们的生活变得悲惨——或者至少给它一个机会。

我目前正在考虑实现类似反应堆模式的东西。实际上,我想在一个类线程上监听传入的套接字连接,当有人尝试连接时,接受该连接并将其传递给另一个类线程进行处理。

我(还)不确定我可能面临什么样的负载。我知道目前对传入消息设置了 2MB 的上限。从理论上讲,我们每秒可以得到数千(尽管我不知道实际上我们是否见过类似的东西)。处理消息所花费的时间并不非常重要,但显然越快越好。

我正在研究 Reactor 模式,并使用 multiprocessing 库开发了一个小示例,该库(至少在测试中)似乎工作得很好。但是,现在/很快我们将拥有可用的 asyncio 库,它将为我处理事件循环。

asynciomultiprocessing 结合起来有什么可以咬我的吗?

【问题讨论】:

【参考方案1】:

您应该能够安全地组合 asynciomultiprocessing 而不会有太多麻烦,尽管您不应该直接使用 multiprocessingasyncio(以及任何其他基于事件循环的异步框架)的主要罪过是阻塞事件循环。如果你尝试直接使用multiprocessing,任何时候你阻塞等待子进程,你都会阻塞事件循环。显然,这很糟糕。

避免这种情况的最简单方法是使用BaseEventLoop.run_in_executor 执行concurrent.futures.ProcessPoolExecutor 中的函数。 ProcessPoolExecutor 是使用multiprocessing.Process 实现的进程池,但asyncio 内置支持在其中执行函数而不阻塞事件循环。这是一个简单的例子:

import time
import asyncio
from concurrent.futures import ProcessPoolExecutor

def blocking_func(x):
   time.sleep(x) # Pretend this is expensive calculations
   return x * 5

@asyncio.coroutine
def main():
    #pool = multiprocessing.Pool()
    #out = pool.apply(blocking_func, args=(10,)) # This blocks the event loop.
    executor = ProcessPoolExecutor()
    out = yield from loop.run_in_executor(executor, blocking_func, 10)  # This does not
    print(out)

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(main())

在大多数情况下,仅此功能就足够了。如果您发现自己需要来自multiprocessing 的其他构造,例如QueueEventManager 等,有一个名为aioprocessing 的第三方库(完全披露:我写的),它提供asyncio-所有multiprocessing 数据结构的兼容版本。这是一个演示示例:

import time
import asyncio
import aioprocessing
import multiprocessing

def func(queue, event, lock, items):
    with lock:
        event.set()
        for item in items:
            time.sleep(3)
            queue.put(item+5)
    queue.close()

@asyncio.coroutine
def example(queue, event, lock):
    l = [1,2,3,4,5]
    p = aioprocessing.AioProcess(target=func, args=(queue, event, lock, l)) 
    p.start()
    while True:
        result = yield from queue.coro_get()
        if result is None:
            break
        print("Got result ".format(result))
    yield from p.coro_join()

@asyncio.coroutine
def example2(queue, event, lock):
    yield from event.coro_wait()
    with (yield from lock):
        yield from queue.coro_put(78)
        yield from queue.coro_put(None) # Shut down the worker

if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    queue = aioprocessing.AioQueue()
    lock = aioprocessing.AioLock()
    event = aioprocessing.AioEvent()
    tasks = [ 
        asyncio.async(example(queue, event, lock)),
        asyncio.async(example2(queue, event, lock)),
    ]   
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()

【讨论】:

io 事件循环在主进程中,如果我想在子进程中通过套接字发送/接收,我该怎么做?我发现我不能简单地调用 main_proc_loop.ensure_future(send_socket_data...) 因为它们处于不同的进程中?实现它的最佳方法是什么?通过队列? dano 包稳定吗?是否有新功能或错误修复 - 是否仍在维护? Github 说 2018 年 7 月 - 只是检查。我认为它是活生生的? @DtechNet 应该是稳定的。除非人们开始要求它们,否则我不打算添加任何新功能。如果我收到错误报告或 PR,我打算修复/合并它们。该项目没有很多活动,因为维护它没有太多工作要做。 multiprocessing API 非常稳定,因此即使新版本的 Python 出现,它通常也能继续工作。【参考方案2】:

是的,有很多可能会(或可能不会)咬你。

当您运行asyncio 之类的东西时,它希望在一个线程或进程上运行。这(本身)不适用于并行处理。您必须以某种方式分配工作,同时将 IO 操作(特别是套接字上的操作)留在单个线程/进程中。 虽然将单个连接移交给不同的处理程序进程的想法很好,但很难实现。第一个障碍是您需要一种方法将连接从asyncio 中拉出而不关闭它。下一个障碍是您不能简单地将文件描述符发送到不同的进程,除非您使用来自 C 扩展的平台特定(可能是 Linux)代码。 请注意,multiprocessing 模块已知会创建多个用于通信的线程。大多数时候,当您使用通信结构(例如Queues)时,会产生一个线程。不幸的是,这些线程并非完全不可见。例如,它们可能无法彻底拆除(当您打算终止程序时),但根据它们的数量,资源使用可能会自行引起注意。

如果您真的打算在单个进程中处理单个连接,我建议您检查不同的方法。例如,您可以将套接字置于侦听模式,然后同时并行接受来自多个工作进程的连接。一旦工作人员完成处理请求,它就可以接受下一个连接,因此与为每个连接分叉一个进程相比,您仍然使用更少的资源。 Spamassassin 和 Apache (mpm prefork) 可以使用这种工作模型。根据您的用例,它最终可能会更容易和更健壮。具体来说,您可以让您的工作人员在服务于配置数量的请求后死亡,并由主进程重新生成,从而消除内存泄漏的大部分负面影响。

【讨论】:

我想我的问题有点模棱两可——当我提到我会将它发送到类似线程时,我实际上的意思是它们是单独的事件循环。【参考方案3】:

参见 PEP 3156,尤其是线程交互部分:

http://www.python.org/dev/peps/pep-3156/#thread-interaction

这清楚地记录了您可能使用的新异步方法,包括 run_in_executor()。注意Executor是在concurrent.futures中定义的,建议你也看看那里。

【讨论】:

以上是关于将 asyncio 与多处理结合起来会出现啥样的问题(如果有的话)?的主要内容,如果未能解决你的问题,请参考以下文章

iPhone上的“解锁幻灯片”文本中会出现啥样的动画?

听说python可以写自动办公脚本。那能达到一个啥样的效果呢?或者说在啥样的环境下,会比手动处理更好

“版本文件”是啥样的?

请问在Unity的像素Shader里返回的颜色的计算模式是啥样的?

支付网关 API 登录和交易密钥会造成啥样的损害?

你会用啥样的测试来测试这个