如何在 asyncio 中同时运行任务?

Posted

技术标签:

【中文标题】如何在 asyncio 中同时运行任务?【英文标题】:How to run tasks concurrently in asyncio? 【发布时间】:2019-01-12 03:18:25 【问题描述】:

我正在尝试学习如何使用 Python 的 asyncio 模块同时运行任务。在下面的代码中,我有一个模拟的“网络爬虫”作为示例。基本上,我试图让它在任何给定时间最多有两个活动的 fetch() 请求发生,并且我希望在 sleep() 期间调用 process()。

import asyncio

class Crawler():

    urlq = ['http://www.google.com', 'http://www.yahoo.com', 
            'http://www.cnn.com', 'http://www.gamespot.com', 
            'http://www.facebook.com', 'http://www.evergreen.edu']

    htmlq = []
    MAX_ACTIVE_FETCHES = 2
    active_fetches = 0

    def __init__(self):
        pass

    async def fetch(self, url):
        self.active_fetches += 1
        print("Fetching URL: " + url);
        await(asyncio.sleep(2))
        self.active_fetches -= 1
        self.htmlq.append(url)

    async def crawl(self):
        while self.active_fetches < self.MAX_ACTIVE_FETCHES:
            if self.urlq:
                url = self.urlq.pop()
                task = asyncio.create_task(self.fetch(url))
                await task
            else:
                print("URL queue empty")
                break;

    def process(self, page):
        print("processed page: " + page)

# main loop

c = Crawler()
while(c.urlq):
    asyncio.run(c.crawl())
    while c.htmlq:
        page = c.htmlq.pop()
        c.process(page)

但是,上面的代码会一个一个地下载 URL(不是一次同时下载两个),并且在获取所有 URL 之前不会进行任何“处理”。如何使 fetch() 任务同时运行,并使其在 sleep() 期间调用 process()?

【问题讨论】:

我的最终目标是编写一个异步网络爬虫,它将在后台不断地从 URL 队列中获取页面,HTML/文本处理与获取同时发生。这只是学习使用 asyncio 的模拟代码......这就是为什么我试图在获取的同时运行处理,而不是在之后才这样做。 运行事件循环是一个阻塞操作。你需要让process 成为一个协同程序,它会被循环调度或被其他东西等待。 【参考方案1】:

您的crawl 方法正在等待每个单独的任务;你应该把它改成这样:

async def crawl(self):
    tasks = []
    while self.active_fetches < self.MAX_ACTIVE_FETCHES:
        if self.urlq:
            url = self.urlq.pop()
            tasks.append(asyncio.create_task(self.fetch(url)))
    await asyncio.gather(*tasks)

编辑:这是一个带有 cmets 的更简洁的版本,它同时获取和处理所有内容,同时保留了对最大获取器数量设置上限的基本能力。

import asyncio

class Crawler:

    def __init__(self, urls, max_workers=2):
        self.urls = urls
        # create a queue that only allows a maximum of two items
        self.fetching = asyncio.Queue()
        self.max_workers = max_workers

    async def crawl(self):
        # DON'T await here; start consuming things out of the queue, and
        # meanwhile execution of this function continues. We'll start two
        # coroutines for fetching and two coroutines for processing.
        all_the_coros = asyncio.gather(
            *[self._worker(i) for i in range(self.max_workers)])

        # place all URLs on the queue
        for url in self.urls:
            await self.fetching.put(url)

        # now put a bunch of `None`'s in the queue as signals to the workers
        # that there are no more items in the queue.
        for _ in range(self.max_workers):
            await self.fetching.put(None)

        # now make sure everything is done
        await all_the_coros

    async def _worker(self, i):
        while True:
            url = await self.fetching.get()
            if url is None:
                # this coroutine is done; simply return to exit
                return

            print(f'Fetch worker i is fetching a URL: url')
            page = await self.fetch(url)
            self.process(page)

    async def fetch(self, url):
        print("Fetching URL: " + url);
        await asyncio.sleep(2)
        return f"the contents of url"

    def process(self, page):
        print("processed page: " + page)


# main loop
c = Crawler(['http://www.google.com', 'http://www.yahoo.com', 
             'http://www.cnn.com', 'http://www.gamespot.com', 
             'http://www.facebook.com', 'http://www.evergreen.edu'])
asyncio.run(c.crawl())

【讨论】:

谢谢 dtanabe。您的解决方案确实使 fetch() 调用同时运行,但它仍将处理推迟到所有这些调用都被提取之后。我将如何做到这一点,以便在下载发生时,在任何已放入 htmlq 的 HTML 上调用 process()? 添加了一个更完整的版本,该版本还可以处理提取的数据,同时允许继续进行其他 URL 提取。【参考方案2】:

您可以将htmlq 设为asyncio.Queue(),并将htmlq.append 更改为htmlq.push。然后你的main 可以是异步的,像这样:

async def main():
    c = Crawler()
    asyncio.create_task(c.crawl())
    while True:
        page = await c.htmlq.get()
        if page is None:
            break
        c.process(page)

您的***代码归结为对asyncio.run(main()) 的调用。

一旦你完成了爬取,crawl() 可以将None 加入队列以通知主协程工作已完成。

【讨论】:

这会有额外的好处,通过将队列的大小设置为 2,不需要跟踪并发获取的数量。 谢谢@user4815162342。我以前不知道 asyncio.Queue(),所以这很有帮助。但是当我尝试按照您的建议修改代码时,出现以下错误:“RuntimeError: Task cb=[_run_until_complete_cb() at /usr/lib/python3.7/asyncio/base_events.py:158]> 将 Future 附加到不同的循环" ...我不确定是否需要修改 crawl()以某种方式使用您的代码,或者正在发生什么,但似乎在“page = await self.htmlq.get()”行上出现了一些问题......有什么想法吗? @J.Taylor 问题是Queue() 需要在将要运行的同一个事件循环中实例化,并且每个asyncio.run() 都会创建一个new事件循环。在您的情况下,解决方法是将 Crawler 实例化移动到协程;我现在已经相应地编辑了答案。

以上是关于如何在 asyncio 中同时运行任务?的主要内容,如果未能解决你的问题,请参考以下文章

《asyncio 系列》4. 如何并发运行多个任务(asyncio.gatherasyncio.as_completedasyncio.wait)

使用 asyncio 时,如何让所有正在运行的任务在关闭事件循环之前完成

如何使用 asyncio 和 concurrent.futures.ProcessPoolExecutor 在 Python 中终止长时间运行的计算(CPU 绑定任务)?

如何使用 asyncio 和 concurrent.futures.ProcessPoolExecutor 在 Python 中终止长时间运行的计算(CPU 绑定任务)?

如何使用 asyncio 同时运行无限循环?

Python - 如何使用 asyncio 同时运行多个协程?