使用信号量限制并发 AsyncIO 任务的数量不起作用
Posted
技术标签:
【中文标题】使用信号量限制并发 AsyncIO 任务的数量不起作用【英文标题】:Limiting number of concurrent AsyncIO tasks using Semaphore not working 【发布时间】:2022-01-24 03:57:54 【问题描述】:目标:
我正在尝试同时抓取多个 URL。我不想同时发出太多请求,所以我使用this solution 来限制它。
问题:
正在为所有任务发出请求,而不是一次为有限的数量发出请求。
精简代码:
async def download_all_product_information():
# TO LIMIT THE NUMBER OF CONCURRENT REQUESTS
async def gather_with_concurrency(n, *tasks):
semaphore = asyncio.Semaphore(n)
async def sem_task(task):
async with semaphore:
return await task
return await asyncio.gather(*(sem_task(task) for task in tasks))
# FUNCTION TO ACTUALLY DOWNLOAD INFO
async def get_product_information(url_to_append):
url = 'https://www.amazon.com.br' + url_to_append
print('Product Information - Page ' + str(current_page_number) + ' for category ' + str(
category_index) + '/' + str(len(all_categories)) + ' in ' + gender)
source = await get_source_code_or_content(url, should_render_javascript=True)
time.sleep(random.uniform(2, 5))
return source
# LOOP WHERE STUFF GETS DONE
for current_page_number in range(1, 401):
for gender in os.listdir(base_folder):
all_tasks = []
# check all products in the current page
all_products_in_current_page = open_list(os.path.join(base_folder, gender, category, current_page))
for product_specific_url in all_products_in_current_page:
current_task = asyncio.create_task(get_product_information(product_specific_url))
all_tasks.append(current_task)
await gather_with_concurrency(random.randrange(8, 15), *all_tasks)
async def main():
await download_all_product_information()
# just to make sure there are not any problems caused by two event loops
if asyncio.get_event_loop().is_running(): # only patch if needed (i.e. running in Notebook, Spyder, etc)
import nest_asyncio
nest_asyncio.apply()
# for asynchronous functionality
if __name__ == '__main__':
asyncio.run(main())
我做错了什么?谢谢!
【问题讨论】:
为什么在async
函数中调用gather_with_concurrency
和asyncio.run
?就等着吧。并使用asyncio.run(main())
而不是老式的loop.xxx
。最重要的是:asyncio.run
的工作方式与 loop.run_until_complete(main())
一样,但在您的 asyncio.run(await gather_with_concurrency(..
中运行您等待的东西
非常感谢您的回复!我更新了代码,但它仍然无法正常工作。我没有正确更新吗?
【参考方案1】:
这行出了什么问题:
current_task = asyncio.create_task(get_product_information(product_specific_url))
当您创建“任务”时,它会立即安排执行。尽快 当您的代码让执行到 asyncio 循环时(在任何“等待”表达式中),asyncio 将循环执行您的所有任务。
信号量,在您指出的原始 sn-p 中,也保护了任务本身的创建,确保一次只有“n”个任务处于活动状态。在那个 sn-p 中传递给gather_with_concurrency
的是协程。
与任务不同,协同例程是准备好等待但尚未调度的对象。它们可以免费传递,就像任何其他对象一样——它们只会在它们被等待或被任务包装时才会被执行(然后当代码将控制权传递给 asyncio 循环时)。
在您的代码中,您正在使用get_product_information
调用创建协同例程,并立即将其包装在任务中。在调用gather_with_concurrency
本身的行中的await
指令中,它们都同时运行。
解决方法很简单:此时不要创建任务,只在信号量保护的代码内创建任务。仅将原始协同程序添加到您的列表中:
...
all_coroutines = []
# check all products in the current page
all_products_in_current_page = open_list(os.path.join(base_folder, gender, category, current_page))
for product_specific_url in all_products_in_current_page:
current_coroutine = get_product_information(product_specific_url)
all_coroutines.append(current_coroutine)
await gather_with_concurrency(random.randrange(8, 15), *all_coroutines)
此代码中仍有一个不相关的错误会导致并发失败:您正在对time.sleep
inside gather_product_information
进行同步调用。这将在此时停止 asyncio 循环
直到睡眠结束。正确的做法是使用await asyncio.sleep(...)
。
【讨论】:
非常感谢您的帮助和彻底的回答!它就像一个魅力,我更了解启动。再次感谢,祝您度过愉快的一周!以上是关于使用信号量限制并发 AsyncIO 任务的数量不起作用的主要内容,如果未能解决你的问题,请参考以下文章