如何在异步协程中包装同步函数?

Posted

技术标签:

【中文标题】如何在异步协程中包装同步函数?【英文标题】:How can I wrap a synchronous function in an async coroutine? 【发布时间】:2017-04-05 20:40:03 【问题描述】:

我正在使用aiohttp 构建一个API 服务器,将TCP 请求发送到一个单独的服务器。发送 TCP 请求的模块是同步的,对我来说是一个黑盒子。所以我的问题是这些请求阻塞了整个 API。我需要一种方法将模块请求包装在一个不会阻塞 API 其余部分的异步协程中。

那么,仅以sleep 为例,有没有办法以某种方式将耗时的同步代码包装在非阻塞协程中,如下所示:

async def sleep_async(delay):
    # After calling sleep, loop should be released until sleep is done
    yield sleep(delay)
    return 'I slept asynchronously'

【问题讨论】:

你总是阻塞 I/O。使用协作式多任务处理,您无法获得所需的行为,因为阻塞的协程仅在请求完成后才返回控制(yield)。 aiohttp 适用于 http。对于非 http TCP,asyncio 就足够了。 【参考方案1】:

最终我在this thread 找到了答案。我正在寻找的方法是run_in_executor。这允许同步函数在不阻塞事件循环的情况下异步运行。

在我上面发布的sleep 示例中,它可能看起来像这样:

import asyncio
from time import sleep

async def sleep_async(loop, delay):
    # None uses the default executor (ThreadPoolExecutor)
    await loop.run_in_executor(None, sleep, delay)
    return 'I slept asynchronously'

另请参阅以下答案-> How do we call a normal function where a coroutine is expected?

【讨论】:

ProcessPoolExecutor 成本很高,因为它启动了一个全新的 python 解释器。当您有需要使用多个处理器的 CPU 密集型任务时使用它。考虑改用ThreadPoolExecutor,它使用线程。 感谢您提供更多信息。虽然最初的例子使用了进程池,ThreadPoolExecutor 是我经过更多研究后最终使用的。看起来还是有点笨拙,但到目前为止一切都很好。 请注意,与其创建新的执行器,不如通过调用loop.run_in_executor(executor=None, func, *args)(参见documentation)来使用默认执行器可能更简单。 要获得事件循环,可以这样做loop = asyncio.get_event_loop()【参考方案2】:

您可以使用装饰器将同步版本包装为异步版本。

import time
from functools import wraps, partial


def wrap(func):
    @wraps(func)
    async def run(*args, loop=None, executor=None, **kwargs):
        if loop is None:
            loop = asyncio.get_event_loop()
        pfunc = partial(func, *args, **kwargs)
        return await loop.run_in_executor(executor, pfunc)
    return run

@wrap
def sleep_async(delay):
    time.sleep(delay)
    return 'I slept asynchronously'

或使用aioify 库

% pip install aioify

然后

@aioify
def sleep_async(delay):
    pass

【讨论】:

建议使用aioify,它让编写异步函数和模块变得如此简单:) 即使在使用aiofy函数时,如果函数本身是长时间运行的阻塞操作,它仍然可能阻塞事件循环。在这种情况下,我们需要并行性而不是并发性。除非有办法让在长时间运行的操作中返回到循环。【参考方案3】:

装饰器对这种情况很有用,并在另一个线程中运行您的阻塞函数。

import asyncio
from concurrent.futures import ThreadPoolExecutor
from functools import wraps, partial
from typing import Union

class to_async:

    def __init__(self, *, executor: Optional[ThreadPoolExecutor]=None):
       
        self.executor =  executor
    
    def __call__(self, blocking):
        @wraps(blocking)
        async def wrapper(*args, **kwargs):

            loop = asyncio.get_event_loop()
            if not self.executor:
                self.executor = ThreadPoolExecutor()

            func = partial(blocking, *args, **kwargs)
        
            return await loop.run_in_executor(self.executor,func)

        return wrapper

@to_async(executor=None)
def sync(*args, **kwargs):
    print(args, kwargs)
   
asyncio.run(sync("hello", "world", result=True))

【讨论】:

Union[None, ...] - None 的并集是可选的[...] 不是可选的[None,ThreadPoolExecutor],只是可选的[ThreadPoolExecutor] =)【参考方案4】:

也许有人需要我解决这个问题。我编写了自己的库来解决这个问题,它允许您使用装饰器使任何函数异步。

要安装库,请运行以下命令:

$ pip install awaits

要使您的任何函数异步,只需将 @awaitable 装饰器添加到其中,如下所示:

import time
import asyncio
from awaits.awaitable import awaitable

@awaitable
def sum(a, b):
  # heavy load simulation
  time.sleep(10)
  return a + b

现在你可以确定你的函数是真正的异步协程了:

print(asyncio.run(sum(2, 2)))

“幕后”你的函数将在线程池中执行。每次调用函数时都不会重新创建此线程池。线程池创建一次并通过队列接受新任务。这将使您的程序比使用其他解决方案运行得更快,因为创建额外的线程是额外的开销。

【讨论】:

【参考方案5】:

不确定是否为时已晚,但您也可以使用装饰器在线程中执行您的功能。尽管如此,请注意它仍然是非合作阻塞,不像异步是合作阻塞。

def wrap(func):
    from concurrent.futures import ThreadPoolExecutor
    pool=ThreadPoolExecutor()
    @wraps(func)
    async def run(*args, loop=None, executor=None, **kwargs):
        if loop is None:
            loop = asyncio.get_event_loop()
        future=pool.submit(func, *args, **kwargs)
        return asyncio.wrap_future(future)
    return run

【讨论】:

以上是关于如何在异步协程中包装同步函数?的主要内容,如果未能解决你的问题,请参考以下文章

Kotlin 协程Flow 异步流 ① ( 以异步返回返回多个返回值 | 同步调用返回多个值的弊端 | 尝试在 sequence 中调用挂起函数返回多个返回值 | 协程中调用挂起函数返回集合 )

Kotlin 协程Flow 异步流 ① ( 以异步返回返回多个返回值 | 同步调用返回多个值的弊端 | 尝试在 sequence 中调用挂起函数返回多个返回值 | 协程中调用挂起函数返回集合 )

Swoole 中使用 TCP 异步服务器TCP 协程服务器TCP 同步客户端TCP 协程客户端

python学习道路(day11note)(协程,同步与异步的性能区别,url爬网页,select,RabbitMq)

协程与异步IO

python协程和异步IO——IO多路复用