Python协程中的并行异步IO

Posted

技术标签:

【中文标题】Python协程中的并行异步IO【英文标题】:Parallel asynchronous IO in Python's coroutines 【发布时间】:2017-11-08 00:10:29 【问题描述】:

简单示例:我需要并行发出两个不相关的 HTTP 请求。最简单的方法是什么?我希望它是这样的:

async def do_the_job():
    with aiohttp.ClientSession() as session:
        coro_1 = session.get('http://httpbin.org/get')
        coro_2 = session.get('http://httpbin.org/ip')
        return combine_responses(await coro_1, await coro_2)

换句话说,我想启动 IO 操作并等待它们的结果,以便它们有效地并行运行。这可以通过asyncio.gather 实现:

async def do_the_job():
    with aiohttp.ClientSession() as session:
        coro_1 = session.get('http://example.com/get')
        coro_2 = session.get('http://example.org/tp')
        return combine_responses(*(await asyncio.gather(coro_1, coro_2)))

接下来,我想要一些复杂的依赖结构。我想在具备所有先决条件时开始运营,并在需要结果时获得结果。这里帮助asyncio.ensure_future 将任务与由事件循环分别管理的协程分开:

async def do_the_job():
    with aiohttp.ClientSession() as session:
        fut_1 = asyncio.ensure_future(session.get('http://httpbin.org/ip'))
        coro_2 = session.get('http://httpbin.org/get')
        coro_3 = session.post('http://httpbin.org/post', data=(await coro_2)
        coro_3_result = await coro_3
        return combine_responses(await fut_1, coro_3_result)

为了在我的逻辑流程中使用协程实现并行非阻塞 IO,我是否必须使用 asyncio.ensure_futureasyncio.gather(实际上使用 asyncio.ensure_future)?有没有更“冗长”的方式?

通常开发人员必须考虑哪些协程应该成为单独的任务并使用上述功能以获得最佳性能,这是真的吗?

在事件循环中使用没有多个任务的协程有什么意义吗?

事件循环任务在现实生活中有多“重”?当然,它们比操作系统线程或进程“更轻”。我应该在多大程度上争取尽可能少的此类任务?

【问题讨论】:

【参考方案1】:

我需要并行发出两个不相关的 HTTP 请求。是什么 最简单的方法?

import asyncio
import aiohttp


async def request(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            return await resp.text()


async def main():
    results = await asyncio.gather(
        request('http://httpbin.org/delay/1'),
        request('http://httpbin.org/delay/1'),
    )
    print(len(results))


loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(main())
    loop.run_until_complete(loop.shutdown_asyncgens())
finally:
    loop.close()

是的,您可以使用asyncio.gather 实现并发或使用asyncio.ensure_future 创建任务。

接下来,我想要一些复杂的依赖结构?我想要 当我具备所有先决条件并获得 当我需要结果时结果。

虽然您提供的代码可以完成工作,但最好将并发流拆分到不同的协程上并再次使用asyncio.gather

import asyncio
import aiohttp


async def request(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as resp:
            return await resp.text()


async def get_ip():
    return await request('http://httpbin.org/ip')


async def post_from_get():
    async with aiohttp.ClientSession() as session:
        async with session.get('http://httpbin.org/get') as resp:
            get_res = await resp.text()
        async with session.post('http://httpbin.org/post', data=get_res) as resp:
            return await resp.text()


async def main():
    results = await asyncio.gather(
        get_ip(),
        post_from_get(),
    )
    print(len(results))


loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(main())
    loop.run_until_complete(loop.shutdown_asyncgens())
finally:
    loop.close()

是不是一般开发者都得想什么协程 应该成为单独的任务并使用上述功能来获得 最佳性能?

由于您使用 asyncio,您可能希望同时运行一些作业以获得性能,对吧? asyncio.gather 是一种说法 - “同时运行这些作业以更快地获得结果”。

如果您不必考虑应该同时运行哪些作业来获得性能,您可以使用简单的同步代码。

在事件中使用没有多个任务的协程是否有意义 循环?

在您的代码中,如果您不想要它,则不必手动创建任务:此答案中的两个 sn-ps 均不使用 asyncio.ensure_future。但在内部asyncio 不断地使用任务(例如,正如您所说,asyncio.gather 使用任务本身)。

事件循环任务在现实生活中有多“重”?当然,他们是 比操作系统线程或进程“更轻”。我应该努力到什么程度 尽可能少地完成此类任务?

异步程序的主要瓶颈是(几乎总是)网络:您根本不必担心异步协程/任务的数量。

【讨论】:

这主要回答了我的问题。 asyncio.gather 和使用协程函数链接或 asyncio.ensure_future 是与协程并行执行 IO 的标准方式。另外,我知道我的示例代码错误地使用了 aiohttp。

以上是关于Python协程中的并行异步IO的主要内容,如果未能解决你的问题,请参考以下文章

Python异步IO

异步IO框架:asyncio 中篇

python异步IO

Python进阶协程和异步 IO

gj12-1 协程和异步io

python学习笔记