asyncio时间循环中运行阻塞任务

Posted btxlc

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了asyncio时间循环中运行阻塞任务相关的知识,希望对你有一定的参考价值。

 场景:

在某个异步循环中 需要执行某个阻塞任务(例如文件读写., 保存图片等)

如果这个时候直接在异步事件循环中直接运行, 那么所有任务都会阻塞在这里, 明显是不行的

 

解决方案:

https://docs.python.org/3/library/asyncio-eventloop.html#executing-code-in-thread-or-process-pools

 

import concurrent.futures

def blocking_io():
    # File operations (such as logging) can block the
    # event loop: run them in a thread pool.
    with open(/dev/urandom, rb) as f:
        return f.read(100)

# 在某个异步事件循环中
async def main():
    loop = asyncio.get_running_loop()  # 获取当前事件循环

    result = await loop.run_in_executor(None, blocking_io)

    with concurrent.futures.ThreadPoolExecutor() as pool:
          result = await loop.run_in_executor(
            pool, blocking_io)
          print(custom thread pool, result)

asyncio.run(main())

 

loop.run_in_exeutor(pool, func)

pool为线程池 在python里面为concurrent.futures.ThreadPoolExecutor的实例

func为需要运行的同步函数 如果需要传入参数, 要用functools.partial(func, *arg, **kwargs)  传入.

 

运行之后, 则会在不阻塞异步时间循环的情况下 ,在新的线程中运行func 结束之后用await接收

 

如果一开始不指定线程池,那么官方文档中说的是默认线程池

这里的默认线程池可以用

loop.set_default_executor(executor)

来指定

如果一开始也不指定, 那么经过阅读源码之后 可知:

    def run_in_executor(self, executor, func, *args):
        self._check_closed()
        if self._debug:
            self._check_callback(func, run_in_executor)
        if executor is None:
            executor = self._default_executor
            if executor is None:
                executor = concurrent.futures.ThreadPoolExecutor()
                self._default_executor = executor
        return futures.wrap_future(
            executor.submit(func, *args), loop=self)

他会帮我们自动启用一个线程池, 丢进去运行.

 

总之异步的时间循环中如果要加入阻塞型代码,  千万不能直接阻塞, 用此法可破.

以上是关于asyncio时间循环中运行阻塞任务的主要内容,如果未能解决你的问题,请参考以下文章

使用 asyncio 时,如何让所有正在运行的任务在关闭事件循环之前完成

9Python Asyncio异步编程-事件循环详解

8Python Asyncio异步编程-事件循环详解

《asyncio 系列》4. 如何并发运行多个任务(asyncio.gatherasyncio.as_completedasyncio.wait)

Asyncio之EventLoop笔记

gj13 asyncio并发编程