使用 asyncio 协程并行运行函数?

Posted

技术标签:

【中文标题】使用 asyncio 协程并行运行函数?【英文标题】:Use asyncio coroutine to run functions in parallel? 【发布时间】:2021-01-27 00:13:52 【问题描述】:

我有以下代码从数据库(read_db)读取数据并将数据写入镶木地板文件(data.to_parquet)。两个 I/O 操作都需要一段时间才能运行。

def main():
    while id < 1000:
       logging.info(f'reading - id: id')
       data = read_db(id) # returns a dataframe

       logging.info(f'saving - id: id')
       data.to_parquet(f'id.parquet')
       logging.info(f'saved - id: id')

       id += 1
       

它很慢,所以我希望 read_db(n+1)to_parquet(n) 同时运行。我需要保持id 的每个步骤按顺序完成(read_db(n+1) 需要在read_db(n) 之后运行,data.to_parquet(n+1)data.to_parquet(n) 之后运行。)。这是异步版本

def async_wrap(f):
    @wraps(f)
    async def run(*args, loop=None, executor=None, **kwargs):
        if loop is None:
            loop = asyncio.get_event_loop()
        p = partial(f, *args, **kwargs)
        return await loop.run_in_executor(executor, p)
    return run

async def main():
    read_db_async = async_wrap(read_db)
    while id < 1000:
       logging.info(f'reading - id: id')
       data = await read_db_async(id) # returns a dataframe

       logging.info(f'saving - id: id')
       to_parquet_async = async_wrap(data.to_parquet)
       await data.to_parquet(f'id.parquet')
       logging.info(f'saved - id: id')

       id += 1

asyncio.get_event_loop().run_until_complete(main())

我只是看到一些乱序的日志:

reading - id: 1
saving - id: 1      (saving 1 and reading 2 run in parallel)
reading - id: 2
saved - id: 1
saving - id: 2
reading - id: 3
saved - id: 2
.....

但是,实际上日志和同步代码是一样的吗?

reading - id: 1
saving - id: 1
saved - id: 1
reading - id: 2
saving - id: 2
saved - id: 2
reading - id: 3
.....

【问题讨论】:

这能回答你的问题吗? How to run tasks concurrently in asyncio? 不,答案是并行运行所有任务,这是我需要避免的。我只需要并行运行一些步骤。 您可以并行运行任意数量的任务。不一定是全部。 您的解决方案中只有一个协程在运行,即以main() 开头的协程。 await data.to_parquet(f'id.parquet') 表示当前协程将 sleep 直到 to_parquet 完成,因此在此之前它不会开始下一次迭代。查看this question 获取基本示例 编辑:我说“只有一个协程在运行”,但这并不完全准确,因为异步调用会创建新的协程。问题是您正在等待那些新的协程完成以恢复原来的协程。 【参考方案1】:

您可以使用gather 或等效项使read_db(n+1)to_parquet(n) 同时运行:

async def main():
    read_db_async = async_wrap(read_db)
    prev_to_parquet = asyncio.sleep(0)  # no-op

    for id in range(1, 1000):
        data, _ = await asyncio.gather(read_db_async(id), prev_to_parquet)
        to_parquet_async = async_wrap(data.to_parquet)
        prev_to_parquet = to_parquet_async(f'id.parquet')

    await prev_to_parquet

【讨论】:

我还需要read_db(n+1)read_db(n) 之后运行,data.to_parquet(n+1)data.to_parquet(n) 之后运行。 非常好。实际上,多个read_db(..) 可以并行运行。它需要大部分时间。但是,我不能有超过 3 个read_db(),因为每个都占用大量内存,如果在数据帧中读取太多数据,就会没有足够的内存。 @ca9163d9 当然,您可以使用相同的原理来并行化任意数量的操作。您甚至可以使用以前版本的答案并添加Semaphore(3) 以防止过多的并行读取。我将保留此版本,因为它忠实地回答了所提出的问题(在编辑之后),并且足够简单,可以作为更复杂用例的基础。 我创建了一个新问题***.com/questions/65922160/…。

以上是关于使用 asyncio 协程并行运行函数?的主要内容,如果未能解决你的问题,请参考以下文章

python asyncio 异步 I/O - 协程(Coroutine)与运行

IO框架:asyncio 上篇

异步IO框架:asyncio 中篇

python协程asyncio:async与await

python协程asyncio:async与await

使用 asyncio.gather() 的协程/期货的经过时间