Python Module — asyncio 协程并发

Posted 云物互联

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python Module — asyncio 协程并发相关的知识,希望对你有一定的参考价值。

目录

文章目录

Python Co-routines

Python 对协程(Co-routines)的支持经历了多个版本:

  1. Python2.x 对协程的支持比较有限,通过 yield 关键字支持的生成器实现了一部分协程的功能但不完全。
  2. 第三方库 gevent 对协程有更好的支持。
  3. Python3.4 中提供了 asyncio 模块。
  4. Python3.5 中引入了 async/await 关键字。
  5. Python3.6 中 asyncio 模块更加完善和稳定。
  6. Python3.7 中内置了 async/await 关键字。

Asyncio Module

asyncio — Asynchronous I/O, event loop, coroutines and tasks

  • Asynchronous I/O(异步 I/O):只发出 I/O 的执行,并不等待 I/O 的结果,释放 CPU,提高程序运行效率。
  • Event loop(事件循环):事件循环是一种处理多并发的有效手段。通过启动一个无限的事件循环,提供事件监测、事件触发等处理工作。可以将 Coroutines 对象注册到事件循环上,当特定的某个事件发生时,就会调用相应的协程函数。
  • Coroutines(协程):通过 async def 声明一个协程函数,对协程函数的调用不会立即执行函数体,而是返回一个 Coroutines 对象。 Coroutines 对象需要注册到事件循环,由事件循环调用。
  • Tasks(任务):是对 Coroutines 对象的进一步封装,包含了面向任务的各种状态,支持任务创建、任务取消等管理功能。在注册事件循环时,通过 run_until_complete() 方法会将 Coroutines 对象包装成为了一个 Task 对象。
  • Futures(将来执行任务的结果):Future 类是 Task 类的父类,实现了 Task 对象执行结果的保存。

Event Loop

asyncio 编程模型的核心就是一个基于消息的事件循环,程序从 asyncio 模块中获取到一个 Event Loop 的引用,然后把需要执行的协程都扔到 Event Loop 中执行,就实现了异步 I/O。

asyncio 的 Event Loop 拥有多种方式去启动协程,最简单的一种就是使用 run_until_complete() 方法。

import asyncio


async def coroutine():
    print('in coroutine')
    return 'result'

coro = coroutine()

event_loop = asyncio.get_event_loop()
try:    
    print('entering event loop')
    result = event_loop.run_until_complete(coro)
    print(f'it returned: result')
finally:
    print('closing event loop')
    event_loop.close()

OUTPUT:

entering event loop
in coroutine
it returned: result
closing event loop

Async 与 Await

async 关键字用于创建一个协程(返回一个 Coroutines 对象),该 Coroutines 对象需要注册到事件循环,由事件循环调用。针对不同的场景场景有 async def、async for、async with 等几种使方式。

await 关键字用于针对阻塞的 I/O 操作进行挂起,作用与生成器中的 yield(让出)关键字相同,协程函数将会让出控制权。也就是说,当执行协程函数体的过程中遇到了 await 语句,Event Loop 就会把该协程挂起,继而执行其他的协程,直到其他的协程也挂起或者执行完毕后,再执行下一个协程。

async def

async def 用于声明一个协程函数,对协程函数的调用不会立即执行函数体,而是返回一个 Coroutines 对象。一个简单的例子如前文所述。

值的注意的是,async def 支持 “链式协程(Chain coroutines)”,即:父协程可以创建子协程,以此往复,形成一条由若干个协程组成的调用链,并且彼此之间遵循顺序执行。

import asyncio


async def compute(x, y):
    print("Compute %s + %s ..." % (x, y))
    await asyncio.sleep(1.0)
    return x + y

async def print_sum(x, y):
    result = await compute(x, y)
    print("%s + %s = %s" % (x, y, result))

loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()

OUTPUT:

Compute 1 + 2 ...
1 + 2 = 3

async for

async for 就相当于一个异步的生成器(Generator)。在 Python3.5 之前,实现异步生成器是一件很麻烦的事情,需要定义一个类,并且定义 __iter____next__ 方法,之后才能使用。

现在只需要使用 async for 就能够简单的实现了,而且还支持列表解析等语法。

import asyncio


async def g2():
    yield 1
    yield 2


async def g1():
    async for v in g2():
        print(v)
    return [v * 2 async for v in g2()]


loop = asyncio.get_event_loop()
try:
    result = loop.run_until_complete(g1())
    print(f'Result is result')
finally:
    loop.close()

async with

with 是 Python 的 Context Management(上下文管理)语法糖,async with 即是异步的 with。

  • 实现 with 类型
class Sample:

    def __enter__(self):
        print "in __enter__"
        return "Foo"
        
    def __exit__(self, exc_type, exc_val, exc_tb):
        print "in __exit__"


def get_sample():
    return Sample()


with get_sample() as sample:
    print "Sample: ", sample

注意,async with 语法糖同样需要首先实现 async with 类型。目前原生支持 async with 类型的第三方库有比较典型的 aiohttp。

import aiohttp


async def fetch_page(url):
    async with aiohttp.ClientSession() as session:
        async with session.get(url) as response:
            return await response.json()


loop = asyncio.get_event_loop()
result = loop.run_until_complete(fetch_page('http://httpbin.org/get?a=2'))
print(f"Args: result.get('args')")
loop.close()

Future 与 Task

正如前文所言,async def 返回的 Coroutines 对象是不能直接运行的,而是在将 Coroutines 对象注册到 Event Loop 之后,由 run_until_complete() 方法将 Coroutines 对象包装成为了一个 Task 对象。

而 Task 类又是 Future 类的子类,Future 类实现了保存 Task 对象运行后的状态,Future 对象可以用于在未来获取协程的执行结果。

当有需要时,我们可以显式地创建了 task 对象。并且,在 task 对象加入 Event Loop 之前的状态为 Pending,执行当完成后的状态为 Finished。

import asyncio
import time


now = lambda: time.time()


async def do_some_work(x):
    print("waiting:", x)

start = now()
coroutine = do_some_work(2)
loop = asyncio.get_event_loop()

task = loop.create_task(coroutine)
print(task)

loop.run_until_complete(task)
print(task)

print("Time:", now()-start)

OUTPUT:

<Task pending coro=<do_some_work() running at test_asyncio.py:8>>
waiting: 2
<Task finished coro=<do_some_work() done, defined at test_asyncio.py:8> result=None>
Time: 0.0010194778442382812

另外,除了 loop.create_task() 方法之外,也可以使用 asyncio.ensure_future(coroutine) 方法来将一个 Coroutines 对象封装成一个 Task 对象,但两者有着不同的使用场景。后者更多的是用于确保可以得到相应的 Future Result。

  • loop.create_task
AbstractEventLoop.create_task(coro)
Schedule the execution of a coroutine object: wrap it in a future. Return a Task object.

Third-party event loops can use their own subclass of Task for interoperability. In this case, the result type is a subclass of Task.

This method was added in Python 3.4.2. Use the async() function to support also older Python versions.
  • asyncio.ensure_future
asyncio.ensure_future(coro_or_future, *, loop=None)
Schedule the execution of a coroutine object: wrap it in a future. Return a Task object.

If the argument is a Future, it is returned directly.

并发执行

前面的例子中都没有十分显着的并发特性。asyncio 提供了 wait(tasks) 和 gather(*coroutines) 两个方法来收集(一次性接受)多个 Tasks 对象,并返回多个 Futures 对象。前者接受一个 Tasks List 类型参数,返回无序的 Future List 对象结果;后者接受任意个(形参可变长)Coroutines 类型参数,返回有序的 Done List 和 Pendings List 组成的元组类型结果。

import asyncio
import time


now = lambda: time.time()


async def do_some_work(x):
    print("Waiting:", x)
    await asyncio.sleep(x)
    return "Done after s".format(x)


start = now()

coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(4)

tasks = [
    asyncio.ensure_future(coroutine1),
    asyncio.ensure_future(coroutine2),
    asyncio.ensure_future(coroutine3)
]

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))


for task in tasks:
    print("Task ret:", task.result())


print("Time:", now()-start)

OUTPUT:

Waiting: 1
Waiting: 2
Waiting: 4
Task ret: Done after 1s
Task ret: Done after 2s
Task ret: Done after 4s
Time: 4.004698991775513

或者可以这样实现:

import asyncio
  
 async def func1(i):
     print(f"协程函数i马上开始执行。")
     await asyncio.sleep(2)
     print(f"协程函数i执行完毕!")
  
 async def main():
     tasks = []
     for i in range(1, 5):
         tasks.append(asyncio.create_task(func1(i)))
     await asyncio.wait(tasks)
  
 if __name__ == '__main__':
     asyncio.run(main())
  • asyncio.wait
Wait for the Futures and coroutine objects given by the sequence futures to complete. Coroutines will be wrapped in Tasks. Returns two sets of Future: (done, pending).

The sequence futures must not be empty.

timeout can be used to control the maximum number of seconds to wait before returning. timeout can be an int or float. If timeout is not specified or None, there is no limit to the wait time.

return_when indicates when this function should return.
  • asyncio.gather
Return a future aggregating results from the given coroutine objects or futures.

All futures must share the same event loop. If all the tasks are done successfully, the returned future’s result is the list of results (in the order of the original sequence, not necessarily the order of results arrival). If return_exceptions is true, exceptions in the tasks are treated the same as successful results, and gathered in the result list; otherwise, the first raised exception will be immediately propagated to the returned future.

绑定回调

asyncio 支持 “绑定回调” 机制。除了 Callback 函数绑定之外,Event Loop 还提供了 3 中普通函数的调用方式:

  1. call_soon:立即调用普通函数。
  2. call_later:在 Delay 了一段时间后调用普通函数。
  3. call_at:在 Delay 了一段相对时间(Absolute time,相对于 event loop’s time() 之后)后调用普通函数。

add_done_callback

在 Task 执行完成时,可以立即执行 Callback 函数。Callback 函数的最后一个参数是 Future 对象,在 Callback 函数中,可以通过 Future 对象来获取协程对象的返回值。

通过 add_done_callback() 方法为 Task 对象绑定一个 Callback 函数,当 Task(本质是 Coroutines)对象执行完成时,就会调用 Callback 函数。

import time
import asyncio


now = lambda: time.time()


async def do_some_work(x):
    print("waiting:",x)
    return "Done after s".format(x)


def callback(future):
    print("callback:", future.result())


start = now()
coroutine = do_some_work(2)
loop = asyncio.get_event_loop()

task = asyncio.ensure_future(coroutine)
print(task)

task.add_done_callback(callback)
print(task)

loop.run_until_complete(task)

print("Time:", now()-start)

OUTPUT:

<Task pending coro=<do_some_work() running at /app/py_code/study_asyncio/simple_ex3.py:13>>
<Task pending coro=<do_some_work() running at /app/py_code/study_asyncio/simple_ex3.py:13> cb=[callback() at /app/py_code/study_asyncio/simple_ex3.py:18]>
waiting: 2
callback: Done after 2s
Time: 0.00039196014404296875

call_soon

import threading
import asyncio


def callback(args):
    print(f'callback: args (threading.currentThread())')


async def my_coroutine(loop):
    print(f'I ain\\'t got no money (threading.currentThread())')
    await asyncio.sleep(1)
    loop.call_soon(callback, 'first time')
    loop.call_soon(callback, 'second time')
    print(f'Do not go gentle into that good night (threading.currentThread())')


if __name__ == '__main__':
    loopEvent = asyncio.get_event_loop()
    tasks = [my_coroutine(loopEvent), my_coroutine(loopEvent)]
    loopEvent.run_until_complete(asyncio.wait(tasks))
    loopEvent.close()

call_later

import threading
import asyncio


def callback(args):
    print(f'callback: args (threading.currentThread())')
    

async def my_coroutine(loop):
    print(f'I ain\\'t got no money (threading.currentThread())')
    loop.call_later(0.4, callback, 'second time')
    loop.call_soon(callback, 'first time')
    await asyncio.sleep(0.5)
    print(f'Do not go gentle into that good night (threading.currentThread())')


if __name__ == '__main__':
    loopEvent = asyncio.get_event_loop()
    tasks = [my_coroutine(loopEvent), my_coroutine(loopEvent)]
    loopEvent.run_until_complete(asyncio.wait(tasks))
    loopEvent.close()

call_at

import threading
import asyncio


def callback(args):
    print(f'callback: args (threading.currentThread())')


async def my_coroutine(loop):
    print(f'I ain\\'t got no money (threading.currentThread())')
    loop.call_at(loop.time() + 0.2, callback, 'second time')
    loop.call_soon(callback, 'first time')
    await asyncio.sleep(0.5)
    print(f'Do not go gentle into that good night (threading.currentThread())')


if __name__ == '__main__':
    loopEvent = asyncio.get_event_loop()
    tasks = [my_coroutine(loopEvent), my_coroutine(loopEvent)]
    loopEvent.run_until_complete(asyncio.wait(tasks))
    loopEvent.close()

任务自省

asyncio 提供了任务自省机制,可以将 Event Loop 的 current_task 以及 all_tasks 进行返回,继而程序可以判断 Event Loop 的运行情况。

Future 对象有以下几个状态:

  1. Pending:新建时的状态。
  2. Running:运行时的状态。
  3. Done:完成时的状态。
  4. Cancelled:已取消的状态。

如果需要强制停止 Event Loop,就需要先把已经注册到 Event Loop 中的 Tasks 对象都 Cancel 掉。可以通过 task.cancel() 来强制取消任务的执行,还可以通过 asyncio.Task 来获取已经注册到 Event Loop 中的 Tasks 对象。

import asyncio
import time


now = lambda: time.time()


async def do_some_work(x):
    print("Waiting:", x)
    await asyncio.sleep(x)
    return "Done after s".format(x)


coroutine1 = do_some_work(1)
coroutine2 = do_some_work(2)
coroutine3 = do_some_work(2)

tasks = [
    asyncio.ensure_future(coroutine1),
    asyncio.ensure_future(coroutine2),
    asyncio.ensure_future(coroutine3),
]

start = now()
loop = asyncio.get_event_loop()
try:
    loop.run_until_complete(asyncio.wait(tasks))
except KeyboardInterrupt:
    print(asyncio.Task.all_tasks())
    for task in asyncio.Task.all_tasks():
        print(task.cancel())
    loop.stop()
    loop.run_forever()
finally:
    loop.close()

print("Time:", now()-start)

OUTPUT:

Waiting: 1
Waiting: 2
Waiting: 2
^C<Task pending coro=<do_some_work() running at test_asyncio.py:10> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7fd39f9c5a08>()]> cb=[_wait.<locals>._on_completion() at /usr/lib64/python3.6/asyncio/tasks.py:380]>, <Task pending coro=<do_some_work() running at test_asyncio.py:10> wait_for=<Future pending cb=[<TaskWakeupMethWrapper object at 0x7fd39f9c59d8>()]> cb=[_wait.<locals>._on_completion() at /usr/lib64/python3.6/asyncio/tasks.py:380]以上是关于Python Module — asyncio 协程并发的主要内容,如果未能解决你的问题,请参考以下文章

Python - 如何使用 asyncio 同时运行多个协程?

Python异步IO之协程:使用asyncio的不同方法实现协程

python asyncio 获取协程返回值和使用callback

python 并发专题(十三):asyncio 协程中的多任务

python 并发专题(十三):asyncio 协程中的多任务

python协程(4):asyncio