asyncio--python3未来并发编程主流充满野心的模块

Posted 来自东方地灵殿的小提琴手

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了asyncio--python3未来并发编程主流充满野心的模块相关的知识,希望对你有一定的参考价值。

楔子

asyncio 是 Python 在 3.5 版本中正式引入的标准库,这是 Python 未来并发编程的主流,非常重要的一个模块。有一个 Web 框架叫 sanic,就是基于 asyncio,使用 sanic 可以达到匹配 Go 语言的并发量(有点夸张了,还是有差距的,但至少在一个量级)。

asyncio 模块提供了使用协程构建并发应用的工具,threading 模块通过应用线程实现并发,multiprocessing 使用系统进程实现并发。asyncio 使用一种单线程、单进程模式实现并发,应用的各个部分会彼此合作,在最优的时刻显式的切换任务。大多数情况下,会在程序阻塞等待读写数据时发生这种上下文切换,不过 asyncio 也支持调度代码在将来的某个特定时间运行,从而支持一个协程等待另一个协程完成,以处理系统信号和识别其他一些事件(这些事件可能导致应用改变其工作内容)。

asyncio中,有几个非常重要的概念。

  • coroutine 对象(协程对象):调用一个使用 async def 定义的函数会返回一个协程对象,协程对象无法直接执行,需要将其注册到事件循环中,由事件循环调用。
  • Future 对象(未来对象):在 asyncio 中,如何才能得到异步调用的结果呢?先设计一个对象,异步调用执行完的时候,就把结果放在它里面,这种对象称之为未来对象。未来对象有一个 result 方法,可以获取未来对象的内部结果。还有个 set_result 方法,是用于设置 result 的。set_result 设置的是什么,调用 result 得到的就是什么。Future 对象可以看作下面的 Task 对象的容器。
  • Task 对象(任务):一个协程就是一个原生可以挂起的函数,任务则是对象协程的进一步封装,里面可以包含协程在执行时的各种状态,关于 Task 和 Future 两者之前的关系我们后面会说。
  • event loop(事件循环):程序开启一个无限循环,可以把一些协程注册到事件循环中,当满足事件发生的时候,就会执行相应的协程。
  • async / await 关键字:Python 3.5 开始引入的用于定义协程函数的关键字,async def 定义一个协程函数,调用协程函数会创建协程对象;在一个协程中可以驱动另一个协程,而驱动的方式就是使用 await 关键字。

使用其它并发模型的大多数程序都采用线性方式编写,而且依赖于语言运行时系统或操作系统的底层线程、进程管理来适当地改变上下文。基于 asyncio 的应用要求应用代码显式地处理上下文切换,要正确地使用相关技术,这取决于是否能正确理解一些相关联的概念。

asyncio 提供的框架以一个事件循环(event loop)为中心,这是一个首类对象,负责高效地处理I / O 事件、系统事件、和应用上下文切换。目前已经提供了多个循环实现来高效地利用操作系统的功能。尽管通常会自动选择一个合理的默认实现,但也完全可以在应用中选择某个特定的事件循环实现。

与事件循环交互的应用要显式地注册将运行的代码,让事件循环在资源可用时向应用代码发出必要的调用。

例如:一个网络服务器打开套接字,然后注册为当这些套接字上出现输入事件时服务器要得到的通知。

事件循环在建立一个新的进入链接或者在数据可读取时都会提醒服务器代码,当前上下文中没有更多工作可做时,应用代码要再次短时间地交出控制权。

例如:如果一个套接字没有更多的数据可以接收,那么服务器会把控制权交给事件循环。

所以,就是把事件注册到事件循环中,不断地循环这些事件,可以处理了那么就去处理,如果卡住了,那么把控制权交给事件循环,继续执行其他可执行的任务。

像传统的 twisted、gevent、以及 tornado,都是采用了事件循环的方式,这种模式只适用于高 I / O,低 CPU 的场景,一旦出现了耗时的复杂运算,那么所有任务都会被卡住。

将控制权交给事件循环的机制依赖于协程(coroutine),这是一些特殊的函数,可以将控制返回给调用者而不丢失其状态。

协程与生成器非常类似,实际上,在 python3.5 版本之前还未对协程提供原生支持时,可以用生成器来实现协程。asyncio 还为协议(protocol)和传输(transport)提供了一个基于类的抽象层,可以使用回调编写代码而不是直接编写协程。在基于类的模型和协程模型时,可以通过重新进入事件循环显式地改变上下文,以取代 Python 多线程实现中隐式的上下文改变。

创建一个协程并执行

那么我们看看如何创建一个协程,协程是一个专门设计用来实现并发操作的语言构造。在早期版本,是使用yield来模拟协程,但它本质上是一个生成器,但是从 Python3.5 开始,Python 已经支持原生协程。通过 async def 定义一个协程函数,调用协程函数会创建一个协程(对象),协程中可以使用 await 关键字驱动另一个协程执行。

# 使用 async def 可以直接定义一个协程函数
async def coroutine():
    print("in coroutine")

# 得到一个协程对象
c = coroutine()
print(c)  # <coroutine object coroutine at 0x000002903CB322C0>
print(type(c))  # <class \'coroutine\'>

# 协程是没有办法直接执行的, 我们需要扔到事件循环中
import asyncio
loop = asyncio.get_event_loop()  # 创建一个事件循环
loop.run_until_complete(c)
"""
in coroutine
"""
loop.close()  # 关闭事件循环

run_until_complete 方法会启动协程,也可以同时启动多个协程,当所有的协程都运行关闭时,会停止循环。

值得一提的是,从 Python3.7 开始,async 和 await 已经是关键字了,我们之前说的关键字其实是保留关键字,意思是你可以使用 async 和 await 作为变量名,但是在 Python3.7之后,就不可以了。另外在 Python3.7 中还提供了另一种运行协程的方法。

import asyncio

async def coroutine():
    print("in coroutine")

c = coroutine()
# asyncio.run 内部包含了创建事件循环、执行协程、关闭事件循环等一套逻辑
asyncio.run(c)
"""
in coroutine
"""

如果协程有返回值呢?

import asyncio

async def coroutine():
    print("in coroutine")
    return "result"

loop = asyncio.get_event_loop()
c = coroutine()
result = loop.run_until_complete(c)
print(result)
"""
in coroutine
result
"""
loop.close()


# 对于 asyncio.run 而言也是一样的
c = coroutine()
result = asyncio.run(c)
print(result)
"""
in coroutine
result
"""

但是注意了,如果一个协程已经运行完毕了,那么就不能再扔到事件循环中了,举个栗子:

import asyncio

async def coroutine():
    return "result"

loop = asyncio.get_event_loop()
try:
    c = coroutine()
    result = loop.run_until_complete(c)
    print(result)  # result
    # 再次运行
    loop.run_until_complete(c)
except RuntimeError as e:
    print(e)  # cannot reuse already awaited coroutine
finally:
    loop.close()

如果一个协程已经被扔到事件循环中执行完毕了,那么它就已经是 awaited 的协程了,关于 await 我们后面会说。这个时候要是把 awaited 的协程再扔到事件循环中,那么就会报错。

多个协程合作

一个协程还可以驱动另一个协程执行,并等待其返回结果,从而可以更容易地将一个任务分解为多个可重用的部分。

import asyncio

async def worker():
    print("worker....")
    # 使用 await 方法会驱动协程 consumer() 执行, 并得到其返回值
    # 这里类似于函数调用一样, 但是协程需要加上一个 await
    res = await consumer()
    print(res)


async def consumer():
    return "i am consumer"


asyncio.run(worker())
"""
worker....
i am consumer
"""

在这里,使用 await 关键字,而不是向循环中增加新的协程。因为控制流已经在循环管理的一个协程中,所以没必要告诉事件循环来管理这些协程。另外,协程可以并发运行,但前提是多个协程。这个协程卡住了,可以切换到另一个协程。但是就卡住的协程本身来说,该卡多长时间还是多长时间,不可能说跳过卡住的部分执行下面的代码。

另外我们还可以通过装饰器来模拟协程,协程函数是 asyncio 设计中的关键部分,它们提供了一个语言构造,可以停止程序某一部分的执行,保留这个调用的状态,并在以后重新进入这个状态,这些动作都是并发框架很重要的功能。Python3.5中引入了一些新的语言特性,可以使用 async def 以原生方式定义这些协程函数,以及使用 await 交出控制,asyncio 的例子应用了这些新特性。但是早期版本,可以使用 asyncio.coroutine 装饰器将函数装饰成一个协程函数,并使用 yield from 来达到同样其它协程执行的效果。

import asyncio

@asyncio.coroutine
def worker():
    print("worker....")
    res = yield from consumer()
    print(res)


@asyncio.coroutine
def consumer():
    return "i am consumer"


asyncio.run(worker())
"""
worker....
i am consumer
"""

我们看到使用生成器依旧可以达到这样的效果,然而尽管使用生成器可以达到同样的效果,但还是推荐使用 async 和 await,原因如下:

  • 生成器既可以做生成器,又可以包装为协程,那么它到底是协程还是生成器呢?这会使得代码出现混乱
  • 生成器既然叫生成器,那么就应该做自己
  • 基于async的原生协程比使用yield装饰器的协程要快,大概快10-20%

并且在 Python3.8 中,已经警告了,不建议使用这种方法,定义一个协程函数应该使用 async def。

Task 与 Future

对于协程来说,是没有办法直接放到事件循环里面运行的,需要的是 Task 对象(任务)。而我们刚才之所以直接将协程扔进去,是因为 asyncio 内部会有检测机制,如果是协程的话,会自动将协程包装成一个 Task 对象。

import asyncio

async def coroutine():
    print(123)


loop = asyncio.get_event_loop()
# 如何创建一个任务呢?
task = loop.create_task(coroutine())
loop.run_until_complete(task)
"""
123
"""

因此一个协程是一个可以原生挂起的函数,而一个 Task 对象则是对协程的进一步封装,里面包含了协程的各种执行状态。

而 Future 被称之为未来对象,我觉得可以把它看成是 Task 对象的容器,因为 Task 继承自 Future。任务执行完毕的时候会通过 set_result 设置返回值,然后外部可以调用 result 获取 set_result 设置的返回值。而 Task 对象内部是没有这两个方法的,它们属于 Future 对象,但是 Task 是 Future 的子类,所以它也可以使用。我们可以通过 asyncio 直接创建一个 Future 对象,但是创建 Task 对象则需要一个协程对象。

import asyncio

def mark_done(future, result):
    print("setting result")
    future.set_result(result)


async def main(loop):
    future = asyncio.Future()
    print("scheduling mark_done")
    # loop.call_later 我们后面会说, 总之这一步是不会阻塞的, 会在两秒钟之后执行
    loop.call_later(2, mark_done, future, "the result")
    print("~~~~~~~~~~~~~~")

    # await future 会等待这个 future 完成, 但什么时候完成呢?
    # 当这个 future 执行 set_result 的时候就代表它完成了, 然后 await future 会返回 set_result 设置的值, 相当于 future.result()
    # 其实我们 await 一个协程也是一样, 也是当协程对应的任务执行完毕、将返回值进行 set_result 的时候
    # 然后我们知道 await 协程 得到的就是当前定义的协程函数的返回值, 其实准确来说,应该是协程对应的 Task 对象的 result()
    # 只不过 result() 得到的就是 set_result 设置进去的值, 而 set_result 设置进去的正式当前定义的协程函数的返回值
    # 尽管是一样的, 但是这个逻辑还是要理清
    res = await future
    print("res =", res)


loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
"""
scheduling mark_done
setting result
res = the result
"""

所以一个协程对应一个任务,而任务继承自未来对象。当调用未来对象内部的 set_result 的时候,代表这个任务执行完毕了。我们 await 协程 的时候可以拿到的返回值,就是 set_result 时设置的值,本质上就是协程函数的返回值,相当于 future.result()、或者 task.result()。

除了做法与协程类似,future 还可以绑定回调,回调的顺序按照其注册的顺序调用。

import asyncio
import functools


def callback(future, n):
    print(f"future result: {future.result()} n:{n}")


async def register_callback(future):
    print("register callback on futures")
    # 设置一个回调,只能传递函数名,触发回调的时候,会自动将future本身作为第一个参数传递给回调函数
    # 回调什么时候执行,还是那句话,当 future 执行set_result的时候执行
    future.add_done_callback(functools.partial(callback, n=1))
    future.add_done_callback(functools.partial(callback, n=2))


async def main(future):
    # 等待回调注册完成
    await register_callback(future)
    print("setting result of future")
    future.set_result("the result")


event_loop = asyncio.get_event_loop()
# 可以直接创建一个未来对象
future = asyncio.Future()
event_loop.run_until_complete(main(future))
"""
register callback on futures
setting result of future
future result: the result n:1
future result: the result n:2
"""

执行任务

任务是与事件循环交互的主要途径之一,任务可以包装协程,并跟踪协程何时完成。另外 Task 继承自 Future,所以它是可以等待的。每个任务都有一个结果,是通过调用 future 内部 set_result 方法设置的,设置的值就是协程的返回值,并且可以通过调用 result() 获取这些结果,当然这些上面说过了,这里不再赘述了。

import asyncio


# 启动一个任务,可以使用 create_task 函数创建一个 Task 对象
# 只要循环还在运行而且协程没有返回, create_task 得到的任务便会作为事件循环管理的并发操作的一部分运行
async def task_func():
    print("in task func")
    return "the result"


async def main(loop):
    print("creating task")
    # 除了使用loop.create_task,我们还可以使用asyncio.ensure_future
    # 对于传入一个协程的话,asyncio.ensure_future还是调用了loop.create_task
    task = loop.create_task(task_func())
    print(f"wait for {task}")
    return_value = await task
    print(f"task completed {task}")
    print(f"return value {return_value}")



loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
"""
creating task
wait for <Task pending name=\'Task-2\' coro=<task_func() running at D:/satori/2.py:6>>
in task func
task completed <Task finished name=\'Task-2\' coro=<task_func() done, defined at D:/satori/2.py:6> result=\'the result\'>
return value the result
"""
# 在我们还没有await驱动任务执行的时候, 是Task pending
# 当await之后, 已处于finished状态, 我们看到了 result, 这个就是调用 set_result 设置进去的

通过 create_task 可以创建任务,那么也可以在任务完成前取消操作。

import asyncio


async def task_func():
    print("in task func")
    return "the result"


async def main(loop):
    print("creating task")
    task = loop.create_task(task_func())

    print("canceling task")
    # 任务创建之后,可以调用cancel函数取消
    task.cancel()
    print(f"canceled task: {task}")

    try:
        # 任务取消之后再await则会引发CancelledError
        await task
    except asyncio.CancelledError:
        print("caught error from canceled task")
    else:
        print(f"task result: {task.result()}")


event_loop = asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main(event_loop))
finally:
    event_loop.close()
"""
creating task
canceling task
canceled task: <Task cancelling name=\'Task-2\' coro=<task_func() running at D:/satori/2.py:4>>
caught error from canceled task
"""

调用常规函数

事件循环中有三个函数,分别是 call_soon、call_later、call_at,我们来看看它们的用法。

call_soon

可以使用这个函数给协程绑定一个回调,从名字也能看出来是立即执行,只不过是遇到阻塞立即执行。

import asyncio
from functools import partial

\'\'\'
除了管理协程和I/P回调,asyncio事件循环还可以根据循环中保存的一个定时器值来调度常规函数调用。
\'\'\'
# 如果回调的时间不重要,那么可以使用call_soon调度下一次循环迭代的调用


def callback(*args, **kwargs):
    print("callback:", args, kwargs)


async def main(loop):
    print("register callback")
    # 接收一个回调函数,和参数
    loop.call_soon(callback, "mashiro", 16)
    print("********")
    # 另外call_soon不支持使用关键字参数来向回调传递参数
    # 所以如果想使用关键字参数,需要使用偏函数转换一下
    # 其实不仅是这里的call_sonn,以及后面要介绍的call_later和call_at都不支持使用关键字参数来向回调传递参数
    # 因此如果不想使用偏函数来包装的话,就直接使用位置参数就可以了
    wrapped = partial(callback, **{"name": "satori", "age": 16})
    loop.call_soon(wrapped, "mahsiro", 16)
    print("—————————")

    await asyncio.sleep(0.6)


loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
"""
register callback
********
—————————
callback: (\'mashiro\', 16) {}
callback: (\'mahsiro\', 16) {\'name\': \'satori\', \'age\': 16}
"""
# 另外我们发现我们在调用了call_soon之后没有立刻执行,而是先进性了print
# 这是因为只有在遇到阻塞才会立刻执行,所以当遇到await asyncio.sleep的时候会去执行
# 另外这里的阻塞,不能是time.sleep,必须是可以awaitable的阻塞

call_later

同样是给一个协程绑定一个回调,但是从名字也能看出来这需要指定一个时间,表示多长时间之后调用。

import asyncio

\'\'\'
要将回调推迟到将来的某个时间调用,可以使用call_later。这个方法的第一个参数是延迟时间(单位为秒),第二个参数是回调。
\'\'\'


def callback(cb, n):
    print(f"{cb} {n}")


async def main(loop):
    print("register callback")
    loop.call_later(0.2, callback, "call_later", "0.2s")
    loop.call_later(0.1, callback, "call_later", "0.1s")
    loop.call_soon(callback, "call_soon", "None")
    print("-----------")
    await asyncio.sleep(0.6)


event_loop = asyncio.get_event_loop()
try:
    print("entering event loop")
    event_loop.run_until_complete(main(event_loop))
finally:
    print("closing event loop")
    event_loop.close()

\'\'\'
entering event loop
register callback
-----------
call_soon None
call_later 0.1s
call_later 0.2s
closing event loop
\'\'\'

我们注意一下 main 里面的第二个 print,我们看到无论是 call_soon 还是 call_later 都是在第二个print 结束之后才调用,说明 call_later 和 call_soon 一样,都是在遇到异步 io 阻塞、比如 asyncio.sleep 之后才会执行。

但是值得一提的是,对于 call_later 来说,计时是从注册回调的那一刻就已经开始了。可如果假设执行 call_later 注册的回调需要 3s,但是asyncio.sleep异步阻塞只有 2s,该怎么办呢?那么不好意思,程序会继续往下走,因为asyncio.sleep 结束之后,还需要 1s 才会执行 call_later 指定的回调。所以程序向下执行,直到出现下一个异步 io 阻塞,如果不是异步 io 阻塞的话,那么 call_later 指定的回调也是不会执行的。

因此:执行回调,是指在遇见异步 io 阻塞的时候才会执行。call_soon 是只要遇见异步 io 就会执行,即使遇见异步 io,call_later 已经等待完毕,执行的先后顺序依旧是 call_soon 先执行。我们来验证一下:

import asyncio
import time


def callback(cb, n):
    print(f"{cb} {n}")


async def main(loop):
    print("register callback")
    loop.call_later(0.2, callback, "call_later", "0.2s")
    loop.call_later(0.1, callback, "call_later", "0.1s")
    loop.call_soon(callback, "call_soon", "None")
    # time.sleep不是异步io, 它是一个同步io
    time.sleep(1)
    # 当time.sleep(1)之后call_later和call_soon肯定都会执行,因为call_later里面指定的是 0.2 和 0.1, 比1小
    await asyncio.sleep(0.6)


event_loop = asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main(event_loop))
finally:
    event_loop.close()

\'\'\'
register callback
call_soon None
call_later 0.1s
call_later 0.2s
\'\'\'

再来看看 call_later:

import asyncio
import time


def callback(cb, n):
    print(f"{cb} {n}")


async def main(loop):
    print("register callback")
    loop.call_later(2, callback, "call_later", "2s")
    print("call_later注册完毕")
    # 这里执行完毕,call_later还没有开始
    await asyncio.sleep(1.5)
    # 1.5 + 1肯定比2大,所以time.sleep(1)之后call_later里面的指定的时间肯定已经过了
    time.sleep(1)
    print("就算时间过了,我还是比call_later指定的回调先执行,因为没有异步io阻塞")
    print("就算时间过了,我还是比call_later指定的回调先执行,因为没有异步io阻塞")
    print("就算时间过了,我还是比call_later指定的回调先执行,因为没有异步io阻塞")
    print("就算时间过了,我还是比call_later指定的回调先执行,因为没有异步io阻塞")
    await asyncio.sleep(0.1)
    print("完了,我上面出现了异步io阻塞,我要比call_later指定的回调后执行了")


event_loop = asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main(event_loop))
finally:
    event_loop.close()

\'\'\'
register callback
call_later注册完毕
就算时间过了,我还是比call_later指定的回调先执行,因为没有异步io阻塞
就算时间过了,我还是比call_later指定的回调先执行,因为没有异步io阻塞
就算时间过了,我还是比call_later指定的回调先执行,因为没有异步io阻塞
就算时间过了,我还是比call_later指定的回调先执行,因为没有异步io阻塞
call_later 2s
完了,我上面出现了异步io阻塞,我要比call_later指定的回调后执行了
\'\'\'

再来看个栗子:

import asyncio


def callback(cb, n):
    print(f"{cb} {n}")


async def main(loop):
    print("register callback")
    loop.call_later(2, callback, "call_later", "2s")
    print("call_later注册完毕")
    await asyncio.sleep(1)
    print("call_later指定的回调能执行吗")
    print("call_later指定的回调能执行吗")
    print("call_later指定的回调能执行吗")
    print("不能")


event_loop = asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main(event_loop))
finally:
    event_loop.close()

\'\'\'
register callback
call_later注册完毕
call_later指定的回调能执行吗
call_later指定的回调能执行吗
call_later指定的回调能执行吗
不能
\'\'\'

我们看到 call_later 指定的回调没有执行程序就退出了,这是因为 main 里面的代码全部执行完之后 call_later 指定的时间还没有到,所以直接退出了。

import asyncio
import time


def callback(cb, n):
    print(f"{cb} {n}")


async def main(loop):
    print("register callback")
    loop.call_later(2, callback, "call_later", "2s")
    print("call_later注册完毕")
    await asyncio.sleep(1)
    time.sleep(1)
    print("call_later指定的回调能执行吗")
    print("call_later指定的回调能执行吗")
    print("call_later指定的回调能执行吗")
    print("能,因为时间已经到了")


event_loop = asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main(event_loop))
finally:
    event_loop.close()

\'\'\'
register callback
call_later注册完毕
call_later指定的回调能执行吗
call_later指定的回调能执行吗
call_later指定的回调能执行吗
能,因为时间已经到了
call_later 2s
\'\'\'

当代码全部执行完之后,call_later 指定的时间已经到了,所以会在最后执行它。

call_at

除了 call_soon 瞬间执行,和 call_later 延迟执行之外,还有一个call_at 在指定之间内执行。实现这个目的的循环依赖于一个单调时钟,而不是墙上的时钟时间,以确保 now 时间绝对不会逆转。要为一个调度回调选择时间,必须使用循环的time方法从这个时钟的内部开始。

import asyncio
import time


def callback(cb, loop):
    print(f"callback {cb} invoked at {loop.time()}")


async def main(loop):
    now = loop.time()
    print("clock time:", time.time())
    print("loop time:", now)
    print("register callback")
    # 表示在当前时间(time = loop.time())之后的0.2s执行,个人觉得类似于 call_later
    loop.call_at(now + 0.2, callback, "call_at", loop)
    time.sleep(1)
    print("是先打印我呢?还是先执行call_at或者call_sonn呢")
    await asyncio.sleep(1)


event_loop = asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main(event_loop))
finally:
    event_loop.close()

\'\'\'
clock time: 1573291054.068545
loop time: 160079.265
register callback
是先打印我呢?还是先执行call_at或者call_sonn呢
callback call_at invoked at 160079.453
\'\'\'
# 所以这和call_later也是类似的,都是在遇到io阻塞之后才会执行

以上三者的执行顺序

首先在遇到异步 io 阻塞的时候,call_soon 是立刻执行,call_later 和 call_at 是需要等指定过了才会执行,如果时间没到,那么执行顺序肯定是call_soon最先,这没问题。但是,如果当遇到一个异步io阻塞的时候,call_later 和 call_at 所指定的时间都过了,那么这个三者的执行顺序是怎么样的呢?

import asyncio


def callback(cb):
    print(f"callback {cb}")


async def main(loop):
    now = loop.time()
    loop.call_at(now + 0.2, callback, "call_at")
    loop.call_later(0.2, callback, "call_later")
    loop.call_soon(callback, "call_soon")

    #await asyncio.sleep(1)


event_loop = asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main(event_loop))
finally:
    event_loop.close()

\'\'\'
callback call_soon
\'\'\'

首先我们发现,如果没有异步 io 阻塞,那么最终只有 call_soon 会执行。

import asyncio
import time


def callback(cb):
    print(f"callback {cb}")


async def main(loop):
    now = loop.time()
    loop.call_at(now + 0.3, callback, "call_at")
    loop.call_later(0.2, callback, "call_later")
    loop.call_soon(callback, "call_soon")
    time.sleep(1)
    await asyncio.sleep(1)


event_loop = asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main(event_loop))
finally:
    event_loop.close()

\'\'\'
callback call_soon
callback call_later
callback call_at
\'\'\'
# 遇到异步io, 那么 call_soon 仍然最先执行
# 至于 call_later 和 call_at, 则是两者指定的时间哪个先到, 先执行哪个

多个task并发执行

首先我们来看一个例子。

import asyncio
import time


async def foo():
    await asyncio.sleep(1)


async def main():
    # 三者是一样的
    await foo()
    await asyncio.create_task(foo())
    await asyncio.ensure_future(foo())


start = time.perf_counter()
asyncio.run(main())
print(f"总用时:{time.perf_counter() - start}")  # 总用时:3.0022929

在一个协程中可以使用 await 关键字驱动另一个协程执行,这里我们驱动了三个协程执行,这没问题。但问题是我们发现总用时为 3s,这是为什么?不是说遇见异步 io 会自动切换么?那么整体用时应该还是 1s 才对啊。确实理论上是这样的,但是观察我们的代码是怎么写的,我们三个 await 是分开写的,而且 await 协程 是能得到当前协程的返回值的,如果这个协程都还没有执行完毕、对应的 Task 对象都还没结束、Future 对象还没有 set_result,我们又怎么能拿到呢?还是那句话,异步是在多个协程之间进行切换,至于当前的协程阻塞了只会切换到另一个协程里面去执行,但是对于当前协程来说,该阻塞多长还是阻塞多长,不可能说这一步阻塞还没过去,就直接调到下一行代码去执行,这是不可能的。

因此三个await,必须等第一个 await 完毕之后,才会执行下一个 await。至于我们刚才的call_soon、call_later、call_at,可以看做是另一个协程,在遇到了 asyncio.sleep 之后就切换过去了。但是对于协程本身来说,该asyncio.sleep 多少秒还是多少秒, 只有sleep结束了,才会执行 await asyncio.sleep 下面的代码。还是那句话,切换是指多个协程之间切换,而我们上面代码是两个 await,这两个 await 本身来说相当于还是串行,就是 main 协程里面的两行代码,只有第一个await结束了,才会执行第二个await。

那么问题来了,我们如何才能让这两个协程并发的执行呢?

asyncio.wait

首先是 asyncio.wait

import asyncio
import time


async def task_func():
    await asyncio.sleep(1)


async def main():
    # 将多个协程或者任务放在一个列表里面,传给 asyncio.wait
    # 里面还可以再传其他参数:
    # timeout:超时时间, 如果在这个时间段内任务没有执行完毕, 那么没完成的任务直接取消
    # return_when:FIRST_COMPLETED, 第一个任务完成时结束; FIRST_EXCEPTION, 第一次出现异常时结束; ALL_COMPLETED, 所有任务都完成时结束。
    # 默认是ALL_COMPLETED
    await asyncio.wait([task_func(), task_func()])


start = time.perf_counter()
asyncio.run(main())
print(f"总用时:{time.perf_counter() - start}")  # 总用时:1.0012839

我们看到此时就只用了 1s, 因为两个任务(协程被包装成任务)是一起执行的。

那么如何获取任务的返回值呢?

import asyncio
import time


async def task_func(n):
    await asyncio.sleep(1)
    return f"task{n}"


async def main():
    # 这个 wait 函数有两个返回值, 一个是执行状态为完成的Task对象, 一个是未完成的Task对象
    finished, pending = await asyncio.wait([task_func(_) for _ in range(1, 5)])
    # 我们说过一旦任务完成,就会通过 future 内部的 set_result方法设置返回值
    # 然后我们通过 future.result() 就能拿到返回值, 而 Task 是 Future 的子类, 可以直接通过 Task 对象调用
    print(f"results: {[task.result() for task in finished]}")


start = time.perf_counter()
asyncio.run(main())
print(f"总用时:{time.perf_counter() - start}")  
"""
results: [\'task2\', \'task3\', \'task1\', \'task4\']
总用时:1.0015823
"""

但是我们发现执行的顺序貌似不是我们添加的顺序,因此 wait 返回的 future 的顺序是无序的,如果希望有序,那么需要使用另一个函数。

可能有人觉得这个 pending 是不是有点脱裤子放屁的感觉的,asyncio.wait 会等到所有的任务都完成,而 pending 又表示没有完成的任务,这不矛盾吗?答案是不矛盾,因为我们 asyncio.wait 内部可以接收一个超时时间,时间一到,没有执行完的任务会直接被取消掉。

import asyncio
import time


async def task_func(n):
    await asyncio.sleep(n)
    return f"task{n}"


async def main():
    finished, pending = await asyncio.wait([task_func(_) for _ in range(1, 5)], timeout=3.9)
    print(f"results: {[future.result() for future in finished]}")
    print(len(pending))  # 1


start = time.perf_counter()
asyncio.run(main())
print(f"总用时:{time.perf_counter() - start}")
"""
results: [\'task3\', \'task1\', \'task2\']
1
总用时:3.9053138
"""

我们看到有一个未完成的任务。

asyncio.gather

asyncio.gather 可以保证返回的结果有序。

import asyncio
import time


async def task_func(n):
    await asyncio.sleep(1)
    return f"task{n}"


async def main():

    # gather只有一个返回值,直接返回已完成的任务的返回值,注意是返回值,不是任务,也就是说返回的是future.result() 或者 task.result()
    # 但是传递的时候就不要传递列表,而是需要传递一个个的task,因此我们这里要将列表打散
    finished = await asyncio.gather(*[task_func(_) for _ in range(1, 5)])
    print(f"results: {[res for res in finished]}")


start = time.perf_counter()
asyncio.run(main())
print(f"总用时:{time.perf_counter() - start}")
"""
results: [\'task1\', \'task2\', \'task3\', \'task4\']
总用时:1.0012363
"""

使用 gather 是可以保证顺序的,顺序就是我们添加任务的顺序。

但这里还有一个问题,如果我们执行的任务里面报错了该怎么办?我们来看一下:

import time
import asyncio


async def f1():
    1 / 0
    return "f1"

async def f2():
    time.sleep(1)
    print("我是 f2")
    return "f2"

async def f3():
    time.sleep(1)
    print("我是 f3")
    return "f3"

async def main():
    finished = await asyncio.gather(f1(), f2(), f3())
    print(finished)

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
"""
我是 f2
Traceback (most recent call last):
  File "D:/satori/1.py", line 24, in <module>
    loop.run_until_complete(main())
  File "C:\\python38\\lib\\asyncio\\base_events.py", line 616, in run_until_complete
    return future.result()
  File "D:/satori/1.py", line 20, in main
    finished = await asyncio.gather(f1(), f2(), f3())
  File "D:/satori/1.py", line 6, in f1
    1 / 0
ZeroDivisionError: division by zero
我是 f3
"""

首先 f1 中出现了除零异常,如果一个任务出现了异常,那么会导致整体异常。但是一个任务出现了异常,并不代表其它的任务就不执行了,从结果上看 f2 和 f3 都已经执行完毕了。但是问题来了,如果我不希望一个任务失败而导致整体异常,该怎么做呢?

import time
import asyncio


async def f1():
    1 / 0
    return "f1"

async def f2():
    time.sleep(1)
    print("我是 f2")
    return "f2"

async def f3():
    time.sleep(1)
    print("我是 f3")
    return "f3"

async def main():
    # asyncio.gather 内部有一个参数 return_exceptions, 默认是 False
    # 如果设置为 True 的话, 那么在失败的时候会将异常返回
    finished = await asyncio.gather(f1(), f2(), f3(), return_exceptions=True)
    print(finished)
    print(finished[0], type(finished[0]))

loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
"""
我是 f2
我是 f3
[ZeroDivisionError(\'division by zero\'), \'f2\', \'f3\']
division by zero <class \'ZeroDivisionError\'>
"""

我们看到当失败的时候,返回值就是对应的异常。

除此之外,asyncio.gather 还可以进行分组,举个栗子:

import asyncio
import time


async def task_func(n):
    await asyncio.sleep(1)
    return f"task{n}"


async def main():

    group1 = asyncio.gather(*[task_func(_) for _ in range(1, 5)])
    group2 = asyncio.gather(*[task_func(_) for _ in range(1, 5)])
    finished = await asyncio.gather(group1, group2)
    print(f"results: {[res for res in finished]}")


start = time.perf_counter()
asyncio.run(main())
print(f"总用时:{time.perf_counter() - start}")
"""
results: [[\'task1\', \'task2\', \'task3\', \'task4\'], [\'task1\', \'task2\', \'task3\', \'task4\']]
总用时:1.0011522
"""

我们看到此时的 finished 就是一个包含列表的列表,里面的列表就是每一个组的结果。

asyncio.as_completed

我们看到这个 wait 类似于 concurrent.futures 里面的 submit,gather 类似于 map,而 concurrent.futures 里面还有一个 as_completed,那么同理 asyncio 里面也有一个 as_completed。另外个人觉得 asyncio 借鉴了 concurrent.futures 里的不少理念,而且 wai t里面还有一个 return_when,这个里面的参数,内部就是从 concurrent.futures 包里面导入的。

那这个函数是用来干什么的呢?从名字也能看出来,是哪个先完成哪个就先返回。as_completed 函数调用后返回一个生成器,会管理指定的一个协程列表,并生成它们的结果,每个协程结束运行时一次生成一个结果。与 wait 类似,as_completed 不能保证顺序,从名字也能看出来,哪个先完成哪个先返回。

import asyncio
import time


async def task_func(n):
    await asyncio.sleep(n)
    return f"task{n}"


async def main():
    # 同样需要传递一个列表, 里面同样可以指定超时时间
    completed = asyncio.as_completed([task_func(2), task_func(1), task_func(3), task_func(4)])
    # 遍历每一个task,进行await,哪个先完成,就先返回
    for task in completed:
        res = await task
        print(res)


start = time.perf_counter()
asyncio.run(main())
print(f"总用时:{time.perf_counter() - start}")
"""
task1
task2
task3
task4
总用时:4.0048034999999995
"""

同步原语

尽管asyncio应用通常作为单线程的进程运行,不过仍被构建为并发应用。由于I/O以及其他外部事件的延迟和中断,每个协程或任务可能按照一种不可预知的顺序执行,为了支持安全的并发执行,asyncio 包含了 threading 和 multiprocessing 模块中一些底层原语的实现。

Lock 可以用来保护对一个共享资源的访问,只有锁的持有者可以使用这个资源。如果有多个请求要得到这个锁,那么其将会阻塞,以保证一次只有一个持有者。

import asyncio


def unlock(lock):
    print("回调释放锁,不然其他协程获取不到。")
    print("但我是1秒后被调用,锁又在只能通过调用我才能释放,所以很遗憾,其他协程要想执行,至少要1秒后了")
    lock.release()


async def coro1(lock):
    print("coro1在等待锁")
    # 使用async with语句很方便,是一个上下文。
    # 我们知道在多线程中,也可以使用with,相当于开始的lock.acquire和结尾lock.release
    # 那么在协程中,也有await lock.acquire和lock.release,以及专业写法async with
    async with lock:
        print("coro1获得了锁")
        print("coro1释放了锁")


async def coro2(lock):
    print("coro2在等待锁")
    # 使用await lock.acquire()和lock.release()这种方式也是一样的
    await lock.acquire()
    print("coro2获得了锁")
    print("coro2释放了锁")
    # 注意release是不需要await的
    lock.release()


async def main(loop):
    # 创建共享锁
    lock = asyncio.Lock()

    print("在开始协程之前创建一把锁")
    await lock.acquire()  # 这里先把锁给锁上
    print("锁是否被获取:", lock.locked())

    # 执行回调将锁释放,不然协程无法获取锁
    loop.call_later(1, unlock, lock)

    # 运行想要使用锁的协程
    print("等待所有协程")
    await asyncio.wait([coro1(lock), coro2(lock)])



loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
"""
在开始协程之前创建一把锁
锁是否被获取: True
等待所有协程
coro2在等待锁
coro1在等待锁
回调释放锁,不然其他协程获取不到。
但我是1秒后被调用,锁又在只能通过调用我才能释放,所以很遗憾,其他协程要想执行,至少要1秒后了
coro2获得了锁
coro2释放了锁
coro1获得了锁
coro1释放了锁
"""

事件

和线程一样,协程里面也有事件的概念。asyncio.Event 基于 threading.Event,它允许多个消费者等待某个事件发生。Event 对象可以使用 set、wait、clear。

  • set:设置标志位,调用is_set可以查看标志位是否被设置。一个刚创建的Event对象默认是没有设置的
  • wait:等待,在没有调用set的情况下,会阻塞。如果设置了set,wait则不会阻塞
  • clear:清空标志位
import asyncio


def set_event(event):
    print("设置标志位,因为协程会卡住,只有设置了标志位才会往下走")
    print("但我是一秒后才被调用,所以协程想往下走起码也要等到1秒后了")
    event.set()


async def coro1(event):
    print("coro1在这里卡住了,快设置标志位啊")
    await event.wait()
    print(f"coro1飞起来了,不信你看现在标志位,是否设置标志位:{event.is_set()}")


async def coro2(event):
    print("coro2在这里卡住了,快设置标志位啊")
    await event.wait()
    print(f"coro2飞起来了,不信你看现在标志位,是否设置标志位:{event.is_set()}")


async def main(loop):
    # 创建共享事件
    event = asyncio.Event()
    # 现在设置标志位了吗?
    print("是否设置标志位:", event.is_set())

    # 执行回调将标志位设置,不然协程卡住了
    loop.call_later(1, set_event, event)

    # 运行卡住的的协程
    print("等待所有协程")
    await asyncio.wait([coro1(event), coro2(event)])


loop = asyncio.get_event_loop()
loop.run_until_complete(main(loop))
"""
是否设置标志位: False
等待所有协程
coro2在这里卡住了,快设置标志位啊
coro1在这里卡住了,快设置标志位啊
设置标志位,因为协程会卡住,只有设置了标志位才会往下走
但我是一秒后才被调用,所以协程想往下走起码也要等到1秒后了
coro2飞起来了,不信你看现在标志位,是否设置标志位:True
coro1飞起来了,不信你看现在标志位,是否设置标志位:True
"""

队列

asyncio.Queue 为协程提供了一个先进先出的数据结构,这与线程的 queue.Queue 或者进程里面的 Queue 很类似。

import asyncio
import time


async def consumer(q: asyncio.Queue, n):
    print(f"消费者{n}号 开始")
    while True:
        await asyncio.sleep(2)
        item = await q.get()
        # 由于我们要开启多个消费者, 为了让其停下来, 我们添加None作为停下来的信号
        if item is None:
            # task_done是什么意思? 队列有一个属性, 叫做unfinished_tasks
            # 每当我们往队列里面put一个元素的时候, 这个值就会加1,
            q.task_done()
            # 并且队列还有一个join方法, 调用之后会一直阻塞, 什么时候不阻塞呢? 当 unfinished_tasks 为 0 的时候。
            # 但是我们每put一个元素的时候, unfinished_tasks都会加 1
            # 而 get 一个元素的时候, unfinished_tasks 不会自动减 1
            # get方法不会自动帮我们做这件事,需要手动调用task_done方法实现
            break
        print(f"消费者{n}号: 消费元素{item}")
        q.task_done()


async def producer(q: asyncio.Queue, consumer_num):
    print(f"生产者 开始")
    for i in range(1, 10):
        await q.put(i)
        print(f"生产者: 生产元素{i}, 并放在了队列里")
    # 为了让消费者停下来, 我就把None添加进去吧
    # 开启几个消费者, 就添加几个None
    for i in range(consumer_num):
        await q.put(None)

    # 等待所有消费者执行完毕
    # 只要unfinished_tasks不为0,那么q.join就会卡住,直到消费者全部消费完为止
    await q.join()
    print("生产者生产的东西全被消费者消费了")


async def main(consumer_num):
    q = asyncio.Queue()
    consumers = [consumer(q, i) for i in range(consumer_num)]
    await asyncio.wait(consumers + [producer(q, consumer_num)])


start = time.perf_counter()
asyncio.run(main(3))
print(f"总用时:{time.perf_counter() - start}")
"""
消费者1号 开始
生产者 开始
生产者: 生产元素1, 并放在了队列里
生产者: 生产元素2, 并放在了队列里
生产者: 生产元素3, 并放在了队列里
生产者: 生产元素4, 并放在了队列里
生产者: 生产元素5, 并放在了队列里
生产者: 生产元素6, 并放在了队列里
生产者: 生产元素7, 并放在了队列里
生产者: 生产元素8, 并放在了队列里
生产者: 生产元素9, 并放在了队列里
消费者0号 开始
消费者2号 开始
消费者1号: 消费元素1
消费者0号: 消费元素2
消费者2号: 消费元素3
消费者1号: 消费元素4
消费者0号: 消费元素5
消费者2号: 消费元素6
消费者1号: 消费元素7
消费者0号: 消费元素8
消费者2号: 消费元素9
生产者生产的东西全被消费者消费了
总用时:7.989401599999999
"""

我们对队列进行循环,然后 await 的时候,实际上有一个更加 pythonic 的写法,也就是 async for。

import asyncio
import time
from tornado.queues import Queue
from tornado import gen


# 注意 asyncio 中的 Queue 不支持 async for,我们需要使用 tornado 中的 Queue
async def consumer(q: Queue, n):
    print(f"消费者{n}号 开始")
    async for item in q:
        await gen.sleep(2)
        if item is None:
            q.task_done()
            break
        print(f"消费者{n}号: 消费元素{item}")
        q.task_done()


async def producer(q: Queue, consumer_num):
    print(f"生产者 开始")
    for i in range(1, 10):
        await q.put(i)
        print(f"生产者: 生产元素{i},并放在了队列里")
    for i in range(consumer_num):
        await q.put(None)

    await q.join()
    print("生产者生产的东西全被消费者消费了")


async def main(consumer_num):
    q = Queue()
    consumers = [consumer(q, i) for i in range(consumer_num)]
    await asyncio.wait(consumers + [producer(q, consumer_num)])


start = time.perf_counter()
asyncio.run(main(3))
print(f"总用时:{time.perf_counter() - start}")
"""
消费者1号 开始
生产者 开始
生产者: 生产元素1,并放在了队列里
生产者: 生产元素2,并放在了队列里
生产者: 生产元素3,并放在了队列里
生产者: 生产元素4,并放在了队列里
生产者: 生产元素5,并放在了队列里
生产者: 生产元素6,并放在了队列里
生产者: 生产元素7,并放在了队列里
生产者: 生产元素8,并放在了队列里
生产者: 生产元素9,并放在了队列里
消费者0号 开始
消费者2号 开始
消费者0号: 消费元素2
消费者2号: 消费元素3
消费者1号: 消费元素1
消费者0号: 消费元素4
消费者2号: 消费元素5
消费者1号: 消费元素6
消费者0号: 消费元素7
消费者2号: 消费元素8
消费者1号: 消费元素9
生产者生产的东西全被消费者消费了
总用时:8.0046028
"""

协程与线程结合

如果出现了一个同步耗时的任务,我们可以将其扔到线程池里面去运行。对于协程来说,仍然是单线程的,我们是可以将耗时的任务单独开启一个线程来执行的。

import asyncio
import time
from concurrent.futures import ThreadPoolExecutor


def foo(n):
    time.sleep(n)
    print(f"foo睡了{n}秒")


async def bar():
    await asyncio.sleep(3)
    return "bar"


async def main():
    # 线程池最多装两个任务
    executor = ThreadPoolExecutor(max_workers=2)
    # loop.run_in_executor 表示扔到线程池里面运行, 这个过程是瞬间返回的
    loop.run_in_executor(executor, foo, 3)
    loop.run_in_executor(executor, foo, 2)
    print("瞬间返回")
    res = await bar()
    print(res)


loop = asyncio.get_event_loop()
start = time.perf_counter()
loop.run_until_complete(main())
print(f"总用时:{time.perf_counter() - start}")
"""
瞬间返回
foo睡了2秒
foo睡了3秒
bar
总用时:3.0015592
"""

所以 run_in_executor 相当于将耗时的任务单独丢到一个线程中执行,但是它不会创建任务,而是返回一个 future,因此它不会等待,而是会立刻向下执行。如果我们希望等待它完成之后再执行下面的逻辑呢?

import asyncio
import time


def foo(n):
    time.sleep(n)
    return f"foo睡了{n}秒"


async def bar():
    await asyncio.sleep(1)
    return "bar"


async def main():
    # 这里不创建线程池也是可以的, 传递一个 None 的话会默认创建
    # 可以调用 await, 会等待耗时任务完成, 同时拿到返回值
    print(await loop.run_in_executor(None, foo, 3))
    print("瞬间返回吗?")
    res = await bar()
    print(res)


loop = asyncio.get_event_loop()
start = time.perf_counter()
loop.run_until_complete(main())
print(f"总用时:{time.perf_counter() - start}")
"""
foo睡了3秒
瞬间返回吗?
bar
总用时:4.0045044
"""

同步 3s,加上异步 1s,所以总共 4s。因此即便扔到线程池中,也是可以等待它完成的,但是这样做没有什么意义,因为你既然要等待的话,那干脆还不如直接调用,有啥必要扔到线程池中呢?

但我之所以说这一点,主要是想表明 run_in_executor 仍然会返回一个 future,因此这就意味着耗时的任务执行完毕之前,循环是不可以关闭的。

import asyncio
import time


def foo(n):
    time.sleep(n)
    return f"foo睡了{n}秒"


async def bar():
    await asyncio.sleep(1)
    return "bar"


async def main():
    loop.run_in_executor(None, foo, 3)
    res = await bar()
    print(res)


loop = asyncio.get_event_loop()
loop.run_until_complete(main())
loop.close()
"""
bar
exception calling callback for <Future at 0x152bd9ccf10 state=finished returned str>
Traceback (most recent call last):
  File "C:\\python38\\lib\\concurrent\\futures\\_base.py", line 328, in _invoke_callbacks
    callback(self)
  File "C:\\python38\\lib\\asyncio\\futures.py", line 374, in _call_set_state
    dest_loop.call_soon_threadsafe(_set_state, destination, source)
  File "C:\\python38\\lib\\asyncio\\base_events.py", line 764, in call_soon_threadsafe
    self._check_closed()
  File "C:\\python38\\lib\\asyncio\\base_events.py", line 508, in _check_closed
    raise RuntimeError(\'Event loop is closed\')
RuntimeError: Event loop is closed
"""

我们看到循环关闭之后,耗时的任务还没有完成,而主线程默认会等待所有子线程执行结束。于是当耗时任务执行完毕之后,返回一个 future,在解析这个 future 的时候发现循环已经关闭了,因此就报错了。

所以我们不能用 asyncio.run,因为它的内部自动包含了 loop.close() 逻辑,如果当耗时任务用时比较长的时候,那么关闭循环之后会报错,但是报错也是在任务执行完毕、返回 future 之后才会报错。

另外,我们有多个同步耗时任务需要扔到线程池中的话,那么最好事先创建一个线程池。因为如果不指定,那么每一个耗时的任务都会创建一个线程池。

import asyncio
from concurrent.futures import ThreadPoolExecutor
import time


def foo(n):
    time.sleep(n)
    return f"foo睡了{n}秒"


async def bar():
    await asyncio.sleep(1)
    return "bar"


async def main():
    executor = ThreadPoolExecutor(max_workers=2)
    # 我们可以在调用 run_in_executor 传入 executor, 也可以通过 set_default_executor 进行设置
    # 这样的话, 后面就会调用 run_in_executor 的时候就会使用我们创建的线程池
    loop.set_default_executor(executor)
    loop.run_in_executor(None, foo, 3)
    loop.run_in_executor(None, foo, 3)
    loop.run_in_executor(None, foo, 3)
    res = await bar()
    print(res)


loop = asyncio.get_event_loop()
loop.run_until_complete(main())

关于 async with 和 async for

如果让你定义一个类支持一个 with 和 for 语句,我相信肯定没有问题,但是 async with 和 async for 呢?我们要怎么实现呢?

async with

我们知道自定义一个类支持 with 语句,需要实现 __enter____exit__ 这两个魔法方法,那么如果想支持 async with,则需要实现 __aenter____aexit__

import asyncio


class Open:

    def __init__(self, file, mode="r", encoding="utf-8"):
        self.file = file
        self.mode = mode
        self.encoding = encoding
        self.__fd = None

    # 要使用 async def 定义
    async def __aenter__(self):
        self.__fd = open(file=self.file, mode=self.mode, encoding=self.encoding)
        return self.__fd

    # 同样使用 async def 定义
    async def __aexit__(self, exc_type, exc_val, exc_tb):
        self.__fd.close()


# 既然是async就必须要创建协程,扔到事件循环里面运行
async def main():
    async with Open("白色相簿.txt") as f:
        print(f.read())


asyncio.run(main())
"""
为什么你那么熟练啊
"""

可以看到我们自己实现了一个 async with,但是注意这个不是异步的,我们还是调用了底层的 open 函数。当然还可以使用 contextlib:

import asyncio
import contextlib


@contextlib.asynccontextmanager
async def foo():
    print("xxx")
    l = list()
    yield l 
    print(l)


async def main():
    async with foo() as l:
        l.append(1)
        l.append(2)
        l.append(3)


asyncio.run(main())
"""
xxx
[1, 2, 3]
"""

async for

我们知道自定义一个类支持 for 语句,需要实现 __iter____next__ 这两个魔法方法,那么如果想支持 async for,则需要实现 __aiter____anext__

import asyncio


class A:

    def __init__(self):
        self.l = [1, 2, 3, 4]
        self.__index = 0

    # 注意:定义 __aiter__ 是不需要 async 的
    def __aiter__(self):
        return self

    # 但是定义 __anext__ 需要 async
    async def __anext__(self):
        try:
            res = self.l[self.__index]
            self.__index += 1
            return res
        except IndexError:
            # 捕获异常,协程则要raise一个StopAsyncIteration
            raise StopAsyncIteration


async def main():
    async for _ in A():
        print(_)


asyncio.run(main())
"""
1
2
3
4
"""

另外我们知道,可以对 for 循环可以作用于生成器,那么 async for 则也可以作用于异步生成器中。

import asyncio


# 具体版本记不清了,不知是3.5还是3.6,记得那时候引入async和await的时候,python是不允许async和yield两个关键字同时出现的
# 但是现在至少python3.7是允许的,这种方式叫做异步生成器。
# 但是注意:如果async里面出现了yield,那么就不可以有return xxx了。
async def foo():
    yield 123
    yield 456
    yield 789
    print("xxx")


async def main():
    async for _ in foo():
        print(_)


asyncio.run(main())
"""
123
456
789
xxx
"""

await

很多人可能对 Python 中的 await 这个关键字很懵逼,到底什么对象才可以被 await 呢?

从抽象基类的源码中我们可以看到一个对象如果想被 await,就必须要实现 __await__ 这个魔法方法。

import asyncio


class A:

    def __await__(self):
        return "xxx"


async def main():
    res = await A()
    print(res)

try:
    asyncio.run(main())
except Exception as e:
    print(e)  # __await__() returned non-iterator of type \'str\'

但是它报错了,意思是必须返回一个迭代器。

import asyncio


class A:

    def __await__(self):
        return "xxx".__iter__()


async def main():
    res = await A()
    print(res)

try:
    asyncio.run(main())
except Exception as e:
    print(e)  # Task got bad yield: \'x\'

说明要返回一个迭代器,然后 yield,但是这里提示我们 Task got bad yield: \'x\'。我们来分析一下这句话,bad yield: \'x\',肯定是告诉我们 yield 出了一个不好的值,这个不好的值被 Task 获取了,也就是不应该给 Task 一个 \'x\'。咦,Task,这是啥?我们首先想到了 asyncio 里面 task,而 task 对应的类正是 Task,这是不是说明我们返回一个Task对象就是可以了。

import asyncio


async def foo():
    return "我是foo"


class A:

    def __await__(self):
        # 同样需要调用__iter__
        return asyncio.ensure_future(foo()).__iter__()


async def main():
    res = await A()
    print(res)

try:
    asyncio.run(main())
except Exception as e:
    print(e)  
"""
我是foo
"""
# 可以看到成功执行, 其实兜了这么大圈子,完全没必要
# await 后面肯定是一个 task(或者future), 而我们 await 的是A(), 那么A()的 __await__ 方法返回的肯定是一个task(或者future)
# await后面如果是coroutine, 会自动包装成 task
# 但是用我们自己的类返回的话, 那么我们必须在 __await__ 中手动的使用 ensure_future 或者 create_task 进行包装, 再返回其迭代器

手动实现异步

尽管我们实现了 async for、async with、await 等方法,但是它们并没有达到异步的效果,比如之前的 async for 底层还是使用了 open。再比如网络请求,对于像 requests 这种库,是属于同步的,因此在协程函数中使用requests.get 是

以上是关于asyncio--python3未来并发编程主流充满野心的模块的主要内容,如果未能解决你的问题,请参考以下文章

编程求职之路

为什么 Java 仍将是未来的主流语言?

编程语言趋势预测:Rust 将成为主流,React 继续统治编程世界

Rust 变成主流?GraphQL 持续走高?2020年编程新趋势都有啥?

编程语言的发展趋势及未来方向

Go并发编程

(c)2006-2024 SYSTEM All Rights Reserved IT常识