使用信号量限制并发 AsyncIO 任务的数量不起作用

Posted

技术标签:

【中文标题】使用信号量限制并发 AsyncIO 任务的数量不起作用【英文标题】:Limiting number of concurrent AsyncIO tasks using Semaphore not working 【发布时间】:2022-01-24 03:57:54 【问题描述】:

目标

我正在尝试同时抓取多个 URL。我不想同时发出太多请求,所以我使用this solution 来限制它。

问题

正在为所有任务发出请求,而不是一次为有限的数量发出请求。

精简代码

async def download_all_product_information():
    # TO LIMIT THE NUMBER OF CONCURRENT REQUESTS
    async def gather_with_concurrency(n, *tasks):
        semaphore = asyncio.Semaphore(n)

        async def sem_task(task):
            async with semaphore:
                return await task

        return await asyncio.gather(*(sem_task(task) for task in tasks))

    # FUNCTION TO ACTUALLY DOWNLOAD INFO
    async def get_product_information(url_to_append):
        url = 'https://www.amazon.com.br' + url_to_append

        print('Product Information - Page ' + str(current_page_number) + ' for category ' + str(
            category_index) + '/' + str(len(all_categories)) + ' in ' + gender)

        source = await get_source_code_or_content(url, should_render_javascript=True)
        time.sleep(random.uniform(2, 5))

        return source

    # LOOP WHERE STUFF GETS DONE
    for current_page_number in range(1, 401):
        for gender in os.listdir(base_folder):
                all_tasks = []

                # check all products in the current page
                all_products_in_current_page = open_list(os.path.join(base_folder, gender, category, current_page))
                for product_specific_url in all_products_in_current_page:
                    current_task = asyncio.create_task(get_product_information(product_specific_url))

                    all_tasks.append(current_task)

                await gather_with_concurrency(random.randrange(8, 15), *all_tasks)

async def main():
    await download_all_product_information()

# just to make sure there are not any problems caused by two event loops
if asyncio.get_event_loop().is_running():  # only patch if needed (i.e. running in Notebook, Spyder, etc)
    import nest_asyncio

    nest_asyncio.apply()

# for asynchronous functionality
if __name__ == '__main__':
    asyncio.run(main())

我做错了什么?谢谢!

【问题讨论】:

为什么在async 函数中调用gather_with_concurrencyasyncio.run?就等着吧。并使用asyncio.run(main()) 而不是老式的loop.xxx。最重要的是:asyncio.run 的工作方式与 loop.run_until_complete(main()) 一样,但在您的 asyncio.run(await gather_with_concurrency(.. 中运行您等待的东西 非常感谢您的回复!我更新了代码,但它仍然无法正常工作。我没有正确更新吗? 【参考方案1】:

这行出了什么问题:

current_task = asyncio.create_task(get_product_information(product_specific_url))

当您创建“任务”时,它会立即安排执行。尽快 当您的代码让执行到 asyncio 循环时(在任何“等待”表达式中),asyncio 将循环执行您的所有任务。

信号量,在您指出的原始 sn-p 中,也保护了任务本身的创建,确保一次只有“n”个任务处于活动状态。在那个 sn-p 中传递给gather_with_concurrency 的是协程。

与任务不同,协同例程是准备好等待但尚未调度的对象。它们可以免费传递,就像任何其他对象一样——它们只会在它们被等待或被任务包装时才会被执行(然后当代码将控制权传递给 asyncio 循环时)。

在您的代码中,您正在使用get_product_information 调用创建协同例程,并立即将其包装在任务中。在调用gather_with_concurrency 本身的行中的await 指令中,它们都同时运行。

解决方法很简单:此时不要创建任务,只在信号量保护的代码内创建任务。仅将原始协同程序添加到您的列表中:

...
all_coroutines = []
# check all products in the current page
all_products_in_current_page = open_list(os.path.join(base_folder, gender, category, current_page))
for product_specific_url in all_products_in_current_page:
     current_coroutine = get_product_information(product_specific_url)

     all_coroutines.append(current_coroutine)

     await gather_with_concurrency(random.randrange(8, 15), *all_coroutines)

此代码中仍有一个不相关的错误会导致并发失败:您正在对time.sleepinside gather_product_information 进行同步调用。这将在此时停止 asyncio 循环 直到睡眠结束。正确的做法是使用await asyncio.sleep(...)

【讨论】:

非常感谢您的帮助和彻底的回答!它就像一个魅力,我更了解启动。再次感谢,祝您度过愉快的一周!

以上是关于使用信号量限制并发 AsyncIO 任务的数量不起作用的主要内容,如果未能解决你的问题,请参考以下文章

是否可以限制在 asyncio 中同时运行的协程数量?

如何限制运行的并行任务数量? [关闭]

asyncio:Python异步编程模块

Python Module — asyncio 协程并发

AFNetworking 2:限制并发下载任务的最大数量

如何在asyncio python中使用子进程模块限制并发进程数