如何在 asyncio 中同时运行任务?
Posted
技术标签:
【中文标题】如何在 asyncio 中同时运行任务?【英文标题】:How to run tasks concurrently in asyncio? 【发布时间】:2019-01-12 03:18:25 【问题描述】:我正在尝试学习如何使用 Python 的 asyncio 模块同时运行任务。在下面的代码中,我有一个模拟的“网络爬虫”作为示例。基本上,我试图让它在任何给定时间最多有两个活动的 fetch() 请求发生,并且我希望在 sleep() 期间调用 process()。
import asyncio
class Crawler():
urlq = ['http://www.google.com', 'http://www.yahoo.com',
'http://www.cnn.com', 'http://www.gamespot.com',
'http://www.facebook.com', 'http://www.evergreen.edu']
htmlq = []
MAX_ACTIVE_FETCHES = 2
active_fetches = 0
def __init__(self):
pass
async def fetch(self, url):
self.active_fetches += 1
print("Fetching URL: " + url);
await(asyncio.sleep(2))
self.active_fetches -= 1
self.htmlq.append(url)
async def crawl(self):
while self.active_fetches < self.MAX_ACTIVE_FETCHES:
if self.urlq:
url = self.urlq.pop()
task = asyncio.create_task(self.fetch(url))
await task
else:
print("URL queue empty")
break;
def process(self, page):
print("processed page: " + page)
# main loop
c = Crawler()
while(c.urlq):
asyncio.run(c.crawl())
while c.htmlq:
page = c.htmlq.pop()
c.process(page)
但是,上面的代码会一个一个地下载 URL(不是一次同时下载两个),并且在获取所有 URL 之前不会进行任何“处理”。如何使 fetch() 任务同时运行,并使其在 sleep() 期间调用 process()?
【问题讨论】:
我的最终目标是编写一个异步网络爬虫,它将在后台不断地从 URL 队列中获取页面,HTML/文本处理与获取同时发生。这只是学习使用 asyncio 的模拟代码......这就是为什么我试图在获取的同时运行处理,而不是在之后才这样做。 运行事件循环是一个阻塞操作。你需要让process
成为一个协同程序,它会被循环调度或被其他东西等待。
【参考方案1】:
您的crawl
方法正在等待每个单独的任务;你应该把它改成这样:
async def crawl(self):
tasks = []
while self.active_fetches < self.MAX_ACTIVE_FETCHES:
if self.urlq:
url = self.urlq.pop()
tasks.append(asyncio.create_task(self.fetch(url)))
await asyncio.gather(*tasks)
编辑:这是一个带有 cmets 的更简洁的版本,它同时获取和处理所有内容,同时保留了对最大获取器数量设置上限的基本能力。
import asyncio
class Crawler:
def __init__(self, urls, max_workers=2):
self.urls = urls
# create a queue that only allows a maximum of two items
self.fetching = asyncio.Queue()
self.max_workers = max_workers
async def crawl(self):
# DON'T await here; start consuming things out of the queue, and
# meanwhile execution of this function continues. We'll start two
# coroutines for fetching and two coroutines for processing.
all_the_coros = asyncio.gather(
*[self._worker(i) for i in range(self.max_workers)])
# place all URLs on the queue
for url in self.urls:
await self.fetching.put(url)
# now put a bunch of `None`'s in the queue as signals to the workers
# that there are no more items in the queue.
for _ in range(self.max_workers):
await self.fetching.put(None)
# now make sure everything is done
await all_the_coros
async def _worker(self, i):
while True:
url = await self.fetching.get()
if url is None:
# this coroutine is done; simply return to exit
return
print(f'Fetch worker i is fetching a URL: url')
page = await self.fetch(url)
self.process(page)
async def fetch(self, url):
print("Fetching URL: " + url);
await asyncio.sleep(2)
return f"the contents of url"
def process(self, page):
print("processed page: " + page)
# main loop
c = Crawler(['http://www.google.com', 'http://www.yahoo.com',
'http://www.cnn.com', 'http://www.gamespot.com',
'http://www.facebook.com', 'http://www.evergreen.edu'])
asyncio.run(c.crawl())
【讨论】:
谢谢 dtanabe。您的解决方案确实使 fetch() 调用同时运行,但它仍将处理推迟到所有这些调用都被提取之后。我将如何做到这一点,以便在下载发生时,在任何已放入 htmlq 的 HTML 上调用 process()? 添加了一个更完整的版本,该版本还可以处理提取的数据,同时允许继续进行其他 URL 提取。【参考方案2】:您可以将htmlq
设为asyncio.Queue()
,并将htmlq.append
更改为htmlq.push
。然后你的main
可以是异步的,像这样:
async def main():
c = Crawler()
asyncio.create_task(c.crawl())
while True:
page = await c.htmlq.get()
if page is None:
break
c.process(page)
您的***代码归结为对asyncio.run(main())
的调用。
一旦你完成了爬取,crawl()
可以将None
加入队列以通知主协程工作已完成。
【讨论】:
这会有额外的好处,通过将队列的大小设置为 2,不需要跟踪并发获取的数量。 谢谢@user4815162342。我以前不知道 asyncio.Queue(),所以这很有帮助。但是当我尝试按照您的建议修改代码时,出现以下错误:“RuntimeError: TaskQueue()
需要在将要运行的同一个事件循环中实例化,并且每个asyncio.run()
都会创建一个new事件循环。在您的情况下,解决方法是将 Crawler
实例化移动到协程;我现在已经相应地编辑了答案。以上是关于如何在 asyncio 中同时运行任务?的主要内容,如果未能解决你的问题,请参考以下文章
《asyncio 系列》4. 如何并发运行多个任务(asyncio.gatherasyncio.as_completedasyncio.wait)
使用 asyncio 时,如何让所有正在运行的任务在关闭事件循环之前完成
如何使用 asyncio 和 concurrent.futures.ProcessPoolExecutor 在 Python 中终止长时间运行的计算(CPU 绑定任务)?
如何使用 asyncio 和 concurrent.futures.ProcessPoolExecutor 在 Python 中终止长时间运行的计算(CPU 绑定任务)?