Python 的 asyncio.gather() 似乎没有异步运行任务

Posted

技术标签:

【中文标题】Python 的 asyncio.gather() 似乎没有异步运行任务【英文标题】:Python's asyncio.gather() doesn't seem to be running tasks asynchronously 【发布时间】:2021-07-19 11:30:43 【问题描述】:

我需要异步运行 20 个任务(每个任务运行相同的函数,但参数不同)。每个任务都使用 Python 的 yfinance API 模块。这是我目前的方法:

    定义一个包含 20 个元素的列表args;每个元素都是要传递给相应任务的参数。 定义一个异步函数 get_data,我将每次使用不同的参数运行 20 次。 定义一个异步函数main,它将使用asyncio.gather异步运行20个任务。

这是(伪)代码:

import asyncio

stocks = []
args = ['arg1', 'arg2', ... , 'arg20']


async def get_data(arg):
    stock = Stock(arg)
    # do some yfinance calls
    return stock


async def main():
    global stocks
    tasks = [asyncio.ensure_future(get_data(arg)) for arg in args]
    stocks = await asyncio.gather(*tasks)


asyncio.run(main())

print(stocks)  # should be a list of 20 return values from the 20 tasks

假设每个任务单独运行需要 4 秒。如果它是异步运行的,那么这 20 个任务应该在 4 秒内运行。但是,它在 80 秒内运行。如果我删除所有异步代码并同步运行它,它会在相同的时间内运行。有什么帮助吗?

谢谢。

【问题讨论】:

你的 get_data() 函数没有等待任何东西,这是一个危险信号,它只是名义上的异步,但实际上是阻塞的。要获得 asyncio 的好处,您需要使用异步库来访问股票(或代码需要的任何其他内容),并使用 await 您可能想了解“异步”的实际含义——它与“并行”相同。 How does asyncio actually work? 可能值得一读,尽管很长。 @S.Naj 有什么反馈吗? 作为异步代码的新手,我没有意识到异步和“并行”之间的区别。 @ArtiomKozyrev 的解决方案完全符合预期,所以我想我了解到 ThreadPoolExecutor 模块“并行”运行同步代码,这与异步运行代码不同。 【参考方案1】:

我检查了yfinance 的文档并在需求中看到requests 库,该库不是异步的。这意味着你不应该将它与 asyncio 模块一起使用,而应该使用 theading.Threadconcurrent.futures.ThreadPoolExecutor

我为您制作了以下示例,请运行它并分享您的结果。

from concurrent.futures import ThreadPoolExecutor
import yfinance as yf
from pprint import pprint
from time import monotonic


def get_stocks_data(name: str) -> dict:
    """some random function which extract some data"""
    tick = yf.Ticker(name)
    tick_info = tick.info
    return tick_info


if __name__ == '__main__':
    # some random stocks
    stocks = [
        'AAPL', 'AMD', 'AMZN', 'FB', 'GOOG', 'MSFT', 'TSLA', 'MSFT',
        'AAPL', 'AMD', 'AMZN', 'FB', 'GOOG', 'MSFT', 'TSLA', 'MSFT',
    ]
    start_time = monotonic()
    # you can choose max_workers number higher and check if app works faster
    # e.g choose 16 as max number of workers
    with ThreadPoolExecutor(max_workers=4) as pool:
        results = pool.map(get_stocks_data, stocks)

    for r in results:
        pprint(r)

    print("*" * 150)
    print(monotonic() - start_time)

【讨论】:

此解决方案完全符合预期。对于同步运行的 10 个任务,运行时间约为 60 秒。使用此解决方案,运行时间约为 7 秒,正如预期的那样。非常感谢您的努力和承诺。

以上是关于Python 的 asyncio.gather() 似乎没有异步运行任务的主要内容,如果未能解决你的问题,请参考以下文章

在超时中包装 asyncio.gather

使用 asyncio.gather() 的协程/期货的经过时间

asyncio.gather() 在具有协程字段的 dict 列表上?

Python学习---Python的异步---asyncio模块(no-http)

在asyncio 中跳出正在执行的task

异步发送请求的几种方式