在 Python 中使用 asyncio 并行化 Web 任务

Posted

技术标签:

【中文标题】在 Python 中使用 asyncio 并行化 Web 任务【英文标题】:Parallelise web tasks with asyncio in Python 【发布时间】:2014-10-28 11:03:21 【问题描述】:

我正试图围绕 asyncio 和 aiohttp 进行思考,多年来编程第一次让我感到完全愚蠢和无能。这有点漂亮,以一种奇怪的禅宗方式。但是,唉,还有工作要做。

我有一个现有的课程,可以在网络上做许多奇妙的事情,比如注册一个网站、获取数据、工作。现在我需要 100 或 1000 只这样的小工蜂来注册。代码大致如下:

class Worker(object):
    def signup(self, ...):
        ...
        data = self.make_request(url, data)
        self.user_id = data.get("user_id")
        return self

    def make_request(self, url, data):
        response = requests.post(url, data=data)
        return response.json()

workers = [Worker().signup() for n in range(100)]

如您所见,我们使用 requests 模块发出 POST 请求。然而这是阻塞的,所以我们必须等待工人 N 完成注册,然后才能开始注册工人 N+1。幸运的是,Worker 类的原作者(听起来很迷人的马克思主义者)以她无限的智慧将每个 HTTP 调用都封装在 self.make_request 方法中,因此使整个 Worker 非阻塞应该只是将 requests 库换成一个非阻塞一个 aaaa 和 bob 是你的叔叔,对吧?这是我走了多远:

class AyncWorker(Worker):
    @asyncio.coroutine
    def make_request(self, url, data):
        response = yield from aiohttp.request('post', url, data=data)
        return (yield from response.json())

coroutines = [Worker().signup() for n in range(100)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(coroutines))
loop.close()

但这会在signup 方法中引发AttributeError: 'generator' object has no attribute 'get',而我使用self.user_id = data.get("user_id")。除此之外,我仍然没有一本整洁的字典中的工人。我知道我很可能完全误解了 asyncio 的工作原理 - 但我已经花了一天时间阅读各种文档、David Beazly 的令人震惊的教程以及大量玩具示例,这些示例足以让我理解它们适用于这种情况很简单。我应该如何构建我的工作人员和异步循环以并行注册 100 个工作人员并最终在他们注册后获得所有工作人员的列表?

【问题讨论】:

一旦你接触到异步协程,你必须以协程方式使用它们。这意味着您的signup 方法必须是一个协程,它执行类似data = yield from self.make_request(url, data) 的操作。您在 asyncio 协程中返回的所有内容都只能与 yield from 一起使用。 【参考方案1】:

一旦你在一个函数中使用了yield(或yield from),这个函数就变成了一个协程。这意味着你不能通过调用它来得到结果:你会得到一个generator object。您至少必须这样做:

@asyncio.coroutine
def some_coroutine(*args):
    #...
    #...
    yield from tasty.asyncio.function()
    return result

def coroutine_user():
    # data = some_coroutine() will give you a generator object instead of result
    data = yield from some_coroutine()
    return data # data here is a plain result: you can call your .get or whatever

猜猜你打电话给coroutine_user()会发生什么:

>>> coroutine_user()
<generator object coroutine_user at 0x7fe13b8a47e0>

缺少async.coroutine 装饰器根本没有帮助:协程具有传染性!要在函数中获得结果,您必须使用yield from。它将你的函数变成另一个协程!

虽然事情并不总是那么糟糕(通常你可以手动迭代生成器对象而不依赖于yield from),asyncio 会特别阻止你这样做:它破坏了一些内部结构(你只能从 Future 或asyncio.coroutine)。所以只需使用concurrent.futures 或类似的东西,除非你要将所有代码都变成协程。作为替代方案,将aiohttp.request 的所有用户与通常的方法隔离开,并使用基于协程的异步工作者和同步的普通旧代码。潜入 asyncio 并实际重构所有代码也是一种选择,显然:您基本上需要在每次调用任何受感染的 asyncio 方法之前放置 yield from

【讨论】:

澄清一下:您必须将yield fromasyncio 协同程序一起使用,因为这是将控制权返回给事件循环并允许协同程序实际运行的唯一方法。如果您被允许只遍历asyncio.Future,您将永远无法将控制权交还给事件循环,协程将永远不会真正运行,并且您最终会尝试在Future 实际运行之前访问它的结果可用的。 concurrent.futures.Future 从多个线程或进程中获取并发,而不是非阻塞 I/O 和事件循环,因此它没有这个限制。

以上是关于在 Python 中使用 asyncio 并行化 Web 任务的主要内容,如果未能解决你的问题,请参考以下文章

使用 Python 3.7+ 进行 100k API 调用,使用 asyncio 并行进行 100 [重复]

Python asyncio ensure_future 装饰器

Python3 asyncio 简介

asyncio/aiohttp 不返回响应

为啥在 python 中对 asyncio 服务器的多个请求的时间会增加?

我如何在异步中使用请求?