为啥 asyncio 的 run_in_executor 在发出 HTTP 请求时很少提供并行化?

Posted

技术标签:

【中文标题】为啥 asyncio 的 run_in_executor 在发出 HTTP 请求时很少提供并行化?【英文标题】:Why asyncio's run_in_executor gives so little parallelization when making HTTP requests?为什么 asyncio 的 run_in_executor 在发出 HTTP 请求时很少提供并行化? 【发布时间】:2018-10-11 21:41:28 【问题描述】:

我编写了一个可以批量查询 REST 端点的基准实用程序。它通过三种方式实现:

    依次使用请求库, 同时,使用 requests 库,但使用 loop.run_in_executor() 包装每个请求, 同时,使用 aiohttp 库。

以下是不同批次大小的结果:

batch_size=16

       concur_times  seq_times  concur_aiohttp_times
count     50.000000  50.000000             50.000000
mean       0.123786   0.235883              0.087843
std        0.009733   0.018039              0.029977
min        0.108682   0.210515              0.071560
25%        0.118666   0.222436              0.075565
50%        0.121978   0.231876              0.080050
75%        0.125740   0.242939              0.086345
max        0.169194   0.283809              0.267874

batch_size=4

       concur_times  seq_times  concur_aiohttp_times
count     50.000000  50.000000             50.000000
mean       0.080764   0.091276              0.052807
std        0.008342   0.016509              0.033814
min        0.069541   0.078517              0.041993
25%        0.076142   0.082242              0.044563
50%        0.079046   0.085540              0.045735
75%        0.081645   0.092659              0.049428
max        0.111622   0.170785              0.281397

如结果所示,aiohttp 例程始终以较大的幅度更加并行。更重要的是,对于小批量 (4),使用loop.run_in_executor(“concur_times”列)的第二种方法仅比顺序方法提高 1/9。

这是为什么呢?我的代码有问题吗?我把它包括在下面。

我尝试将网络 IO 换成 sleepasyncio.sleep,这产生了方法 2 和 3 的预期结果同样快,而方法 1 的 batch_size 慢了几倍。

代码:

import asyncio
import requests
from cytoolz.curried import *
import pandas as pd
from timeit import default_timer as now

url = 'https://jsonplaceholder.typicode.com/todos/'

def dl_todo_with_requests(session, n):
        response = session.get(url + str(n))
        assert(response.status_code == 200)
        text = response.text
        return text

dl_todo_with_requests = curry(dl_todo_with_requests)

def seq_dl(todos_to_get):
        with requests.Session() as session:
                todos = pipe(
                        todos_to_get,
                        map( dl_todo_with_requests(session) ),
                        list )
                return todos

get_todos_from_futures = lambda futures: \
        pipe( futures,
                map( lambda fut: fut.result() ),
                list
            )

async def concur_dl(todos_to_get):
        loop = asyncio.get_running_loop()
        with requests.Session() as session:
                completed_futures, _pending = await \
                        pipe(
                        todos_to_get,
                        map( lambda n:
                                loop.run_in_executor(
                                None,
                                lambda: dl_todo_with_requests(session, n)
                                )),
                        list,
                        asyncio.wait
                        );
                todos = get_todos_from_futures(completed_futures)
                return todos

import aiohttp
async def concur_dl_aiohttp(todos_to_get):
        async def dl(session, todo):
                async with session.get(url + str(todo)) as resp:
                        assert(resp.status == 200)
                        return resp.text()
        dl = curry(dl)
        async with aiohttp.ClientSession() as session:
                loop = asyncio.get_running_loop()
                unexecuted = pipe(
                        todos_to_get,
                        map( dl(session) ),
                        list )
                completed_futures, _pending = await asyncio.wait(unexecuted)
                todos = get_todos_from_futures(completed_futures)
                return todos


def check_todos_received(todos):
        assert(len(todos) == len(todos_to_get))
        todo_has_content = lambda todo: len(todo) > len('')
        assert(all(map(todo_has_content, todos)))
        return True

def measure_it(f):
        start = now();
        f()
        elapsed = now() - start
        return elapsed

inspect = lambda f, it: map(do(f), it)
inspect = curry(inspect)

def bench(n_iters=50,batch_size=4):
        todos_to_get = range(1,batch_size+1)
        seq_dl(todos_to_get)
        # heat caches, if any
        measure_seq = lambda: pipe(
                        seq_dl(todos_to_get),
                        inspect(check_todos_received) )
        measure_concur = lambda: pipe(
                        asyncio.run(concur_dl(todos_to_get)),
                        inspect(check_todos_received) )
        measure_concur_aiohttp = lambda: pipe(
                        asyncio.run(concur_dl_aiohttp(todos_to_get)),
                        inspect(check_todos_received) )
        do_the_bench = lambda dl_f, title: \
               pipe( range(n_iters),
                       inspect(
                               lambda n: \
                               print("doing %s/%s %s batch download" \
                                       % (n+1,n_iters,title))),
                        map(lambda _: measure_it(dl_f)),
                        list )
        concur_times = do_the_bench(measure_concur,'concurrent')
        concur_aiohttp_times = do_the_bench(measure_concur_aiohttp,'concurrent_aiohttp')
        seq_times = do_the_bench(measure_seq,'sequential')
        return dict(
                concur_times=concur_times,
                seq_times=seq_times,
                concur_aiohttp_times=concur_aiohttp_times)

基准测试是这样运行的:bench(n_iters=50,batch_size=4)。然后通过lambda output: pandas.DataFrame(output).describe() 传递输出以生成表格。

【问题讨论】:

【参考方案1】:

asyncio 的run_in_executor 的默认执行器是ThreadPoolExecutor,它使用 Python 线程。所以它也会受到GIL 的影响,如this 线程中所述。

在您的情况下,一次只有一个具有异步作业的线程运行,导致 aiohttp 显示更好的性能。

【讨论】:

以上是关于为啥 asyncio 的 run_in_executor 在发出 HTTP 请求时很少提供并行化?的主要内容,如果未能解决你的问题,请参考以下文章

为啥 asyncio 单线程 速度还能那么快

为啥要显式调用 asyncio.StreamWriter.drain?

为啥 asyncio.Future 与 concurrent.futures.Future 不兼容?

我啥时候应该在常规线程上使用 asyncio,为啥?它是不是提供性能提升?

为啥大多数 asyncio 示例都使用 loop.run_until_complete()?

为啥在 python 中对 asyncio 服务器的多个请求的时间会增加?