如何将python asyncio与线程结合起来?

Posted

技术标签:

【中文标题】如何将python asyncio与线程结合起来?【英文标题】:How to combine python asyncio with threads? 【发布时间】:2015-02-13 03:42:00 【问题描述】:

我已经使用 Python asyncio 和 aiohttp 成功构建了一个 RESTful microservice,它侦听 POST 事件以收集来自各种 feeder 的实时事件。

然后它会构建一个内存结构,以在嵌套的 defaultdict/deque 结构中缓存最近 24 小时的事件。

现在我想定期检查该结构到磁盘,最好使用pickle。

由于内存结构可能 >100MB,因此我希望避免在检查点结构所需的时间内暂停我的传入事件处理。

我宁愿创建结构的快照副本(例如 deepcopy),然后花时间将其写入磁盘并在预设的时间间隔内重复。

我一直在寻找有关如何组合线程(线程甚至是最好的解决方案吗?)和 asyncio 用于此目的的示例,但找不到对我有帮助的东西。

非常感谢任何入门指南!

【问题讨论】:

我使用了 dano 的建议并构建了一个非常简单的多线程设置,该设置每 60 秒检查一次内存中的事件存储到磁盘。这是包含整个逻辑的 git repo 文件的链接:github.com/fxstein/SentientHome/blob/master/engine/… 【参考方案1】:

使用BaseEventLoop.run_in_executor 将方法委托给线程或子进程非常简单:

import asyncio
import time
from concurrent.futures import ProcessPoolExecutor

def cpu_bound_operation(x):
    time.sleep(x) # This is some operation that is CPU-bound

@asyncio.coroutine
def main():
    # Run cpu_bound_operation in the ProcessPoolExecutor
    # This will make your coroutine block, but won't block
    # the event loop; other coroutines can run in meantime.
    yield from loop.run_in_executor(p, cpu_bound_operation, 5)


loop = asyncio.get_event_loop()
p = ProcessPoolExecutor(2) # Create a ProcessPool with 2 processes
loop.run_until_complete(main())

至于是使用ProcessPoolExecutor还是ThreadPoolExecutor,这很难说;腌制一个大对象肯定会消耗一些 CPU 周期,这最初会让你认为ProcessPoolExecutor 是要走的路。但是,将 100MB 对象传递给池中的 Process 将需要在主进程中对实例进行酸洗,通过 IPC 将字节发送到子进程,在子进程中将其取消酸洗,然后再次酸洗它 em> 这样您就可以将其写入磁盘。鉴于此,我的猜测是酸洗/取消酸洗的开销会足够大,因此您最好使用ThreadPoolExecutor,即使您会因为 GIL 而受到性能影响。

也就是说,测试两种方法并确定找出答案非常简单,所以你不妨这样做。

【讨论】:

谢谢达诺!毕竟这要容易得多。你是对的,我选择了使用 ThreadPoolExecutor 的路径,它工作正常。现在每 60 秒编写一次检查点,而不会阻止任何事件处理。【参考方案2】:

我也使用了run_in_executor,但我发现这个函数在大多数情况下有点恶心,因为它需要partial() 作为关键字参数,而且除了单个执行程序和默认事件循环之外,我从不使用任何东西调用它。所以我用合理的默认值和自动关键字参数处理围绕它做了一个方便的包装器。

from time import sleep
import asyncio as aio
loop = aio.get_event_loop()

class Executor:
    """In most cases, you can just use the 'execute' instance as a
    function, i.e. y = await execute(f, a, b, k=c) => run f(a, b, k=c) in
    the executor, assign result to y. The defaults can be changed, though,
    with your own instantiation of Executor, i.e. execute =
    Executor(nthreads=4)"""
    def __init__(self, loop=loop, nthreads=1):
        from concurrent.futures import ThreadPoolExecutor
        self._ex = ThreadPoolExecutor(nthreads)
        self._loop = loop
    def __call__(self, f, *args, **kw):
        from functools import partial
        return self._loop.run_in_executor(self._ex, partial(f, *args, **kw))
execute = Executor()

...

def cpu_bound_operation(t, alpha=30):
    sleep(t)
    return 20*alpha

async def main():
    y = await execute(cpu_bound_operation, 5, alpha=-2)

loop.run_until_complete(main())

【讨论】:

【参考方案3】:

另一种选择是使用loop.call_soon_threadsafeasyncio.Queue 作为通信的中间渠道。

Python 3 的当前文档也有一个关于 Developing with asyncio - Concurrency and Multithreading 的部分:

import asyncio

# This method represents your blocking code
def blocking(loop, queue):
    import time
    while True:
        loop.call_soon_threadsafe(queue.put_nowait, 'Blocking A')
        time.sleep(2)
        loop.call_soon_threadsafe(queue.put_nowait, 'Blocking B')
        time.sleep(2)

# This method represents your async code
async def nonblocking(queue):
    await asyncio.sleep(1)
    while True:
        queue.put_nowait('Non-blocking A')
        await asyncio.sleep(2)
        queue.put_nowait('Non-blocking B')
        await asyncio.sleep(2)

# The main sets up the queue as the communication channel and synchronizes them
async def main():
    queue = asyncio.Queue()
    loop = asyncio.get_running_loop()

    blocking_fut = loop.run_in_executor(None, blocking, loop, queue)
    nonblocking_task = loop.create_task(nonblocking(queue))

    running = True  # use whatever exit condition
    while running:
        # Get messages from both blocking and non-blocking in parallel
        message = await queue.get()
        # You could send any messages, and do anything you want with them
        print(message)

asyncio.run(main())

如何send asyncio tasks to loop running in other thread 也可能对您有所帮助。

【讨论】:

您与来自另一个线程的queue 交互的唯一方式是通过loop.call_soon_threadsafe,它“必须用于调度来自另一个线程的回调。”据我了解,这是安全的方法,但我可能误解了文档。

以上是关于如何将python asyncio与线程结合起来?的主要内容,如果未能解决你的问题,请参考以下文章

如何将 Celery 与 asyncio 结合使用?

在自己的线程中运行事件循环

如何将 asyncio 与 boost.python 一起使用?

asyncio:等待来自其他线程的事件

python 并发专题(十四):asyncio 实战

python 并发专题(十四):asyncio 实战