Python asyncio之协程学习总结

Posted 授客的博客

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python asyncio之协程学习总结相关的知识,希望对你有一定的参考价值。

实践环境

Python 3.6.2

什么是协程

协程(Coroutine)一种电脑程序组件,该程序组件通过允许暂停和恢复任务,为非抢占式多任务生成子程序。协程也可以简单理解为协作的程序,通过协同多任务处理实现并发的函数的变种(一种可以支持中断的函数)。

下面,我们通过日常生活场景为例,对什么是协程进行说明。

假设A某在家每天都要做3件事:洗衣服(使用洗衣机),蒸饭(使用电饭煲),扫地(使用扫地机器人),这三样电器在完成任务后都会发出不一样响声来告诉A某事情已经完成。

这里,暂且假设A某智商有问题,每次都是严格按顺序做这三件事:先洗完衣服,再把饭蒸好,最后才开始扫地。

接下来,我们用一段简单的代码来模拟上述整个过程,并记录整个过程的耗时,其中使用了3个简单的普通函数,分别模拟上述3件事情,如下:

import time
from datetime import datetime


def do_washing():
    print(datetime.now(), \':开始洗衣服\')
    time.sleep(3)  # 洗衣服 # 用程序休眠来模拟过程,且别计较时间大小
    print(datetime.now(), \':通知A某衣服洗好了\') 


def steame_rice():
    print(datetime.now(), \':开始蒸饭\')
    time.sleep(5)  # 蒸饭
    print(datetime.now(), \':通知A某饭蒸好了\')


def do_clearing():
    print(datetime.now(), \':开始扫地\')
    time.sleep(2)  # 扫地
    print(datetime.now(), \':通知A某地扫完了\')


if __name__ == \'__main__\':
    startTime = time.time()
    do_washing()
    steame_rice()
    do_clearing()
    endTime = time.time()
    print("扫地+蒸饭+洗衣服总耗时: ", endTime - startTime)

程序输出:

2023-04-09 23:33:50.001204 :开始洗衣服
2023-04-09 23:33:53.002765 :衣服洗好了
2023-04-09 23:33:53.002765 :开始蒸饭
2023-04-09 23:33:58.013337 :通知A某饭蒸好了
2023-04-09 23:33:58.013337 :通知A某开始扫地
2023-04-09 23:34:00.024784 :通知A某地扫完了
扫地+蒸饭+洗衣服总耗时:  10.023579835891724

直到有一天,A某的朋友来他家做客,体验到他的“高效”办事效率后,建议他不用等每件事情都做完才做下一件事情。A某听后,虚心采纳,并告诉自己要开始培养新的习惯。

第二天开始呢,A某开始改变自己,把衣服扔洗衣机,并启动机洗程序后,就去淘米蒸饭了,等电饭煲开始蒸饭后,就去清扫地板了。

接下来,我们对上述代码进行稍微修改,以便模拟上述过程,并记录整个过程的耗时,如下:

import time
from datetime import datetime
import asyncio

async def do_washing():
    print(datetime.now(),\':开始洗衣服\')
    await asyncio.sleep(3)
    print(datetime.now(),\':通知A某衣服洗好了\')

async def do_clearing():
    print(datetime.now(), \':开始扫地\')
    await asyncio.sleep(5)
    print(datetime.now(), \':通知A某地扫完了\')

async def steame_rice():
    print(datetime.now(), \':开始蒸饭\')
    await asyncio.sleep(2)
    print(datetime.now(), \':通知A某饭蒸好了\')

tasks = [
    do_washing(),
    steame_rice(),
    do_clearing()
]

if __name__ == \'__main__\':
    loop = asyncio.get_event_loop()
    start_time = time.time()
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()
    end_time = time.time()
    print("扫地+蒸饭+洗衣服总耗时: ", end_time - start_time)

程序输出:

2023-04-09 23:35:17.422790 :开始扫地
2023-04-09 23:35:17.422790 :开始蒸饭
2023-04-09 23:35:17.422790 :开始洗衣服
2023-04-09 23:35:19.427500 :通知A某饭蒸好了
2023-04-09 23:35:20.427813 :通知A某衣服洗好了
2023-04-09 23:35:22.429780 :通知A某地扫完了
扫地+蒸饭+洗衣服总耗时:  5.0069899559021

不得不夸A某进步真大,相比之前,这次耗时减少了近一半。

以上这段代码就是协程的简单实现,充分体现了协程的3个特点:

  1. 多任务并行:A某同时完成了3项任务--分别代表3个协程。
  2. 异步任务:3项任务中,没有一项是需要A某在一旁一直看着直到做完的,每项任务开启后,A某都可以离开去做别的任务。
  3. 协作式(非抢占式):每项任务能否“占用”A某,取决于A某是否正被其它任务“占用”,即是否有任务主动“让出”A某,不是靠“抢占”,更像是协商。

有了线程为啥还要协程?

协程是用户视角的一种抽象,操作系统并没有这个概念,其主要思想是在用户态实现调度算法,用少量线程完成大量任务的调度。

相对线程而言,协程具备以下优势:

  • 减少内存占用

    协程的创建成本远小于线程,可以设计得很小,小到KB级别,大大降低内存占用。所以,内存资源有限的情况下,可以创建更多协程,从而实现更高的并发。

  • 减少上下文切换开销,节约CPU资源

如上图,线程之间的切换请求,由系统内核来实现,而协程之间的切换,则可由用户自由控制,即交由用户态的代码来完成,极大程度避免了系统内核级线程上下文切换造成的CPU资源浪费。具体实现思路如下:

  1. 尽量减少可执行的线程,这样切换次数必然会少

  2. 让线程尽可能的处于运行状态,而不是阻塞让出时间片

一个线程可以拥有多个协程,主要注意的是,一个线程内的多个协程却是串行的,无论CPU有多少个核,因为协程本质上还是一个函数,当一个协程运行时,其它协程必须挂起。实际开发过程中,可以使用协程在将一些耗时的IO操作异步化,例如写文件、耗时IO请求等来提升程序执行效率。

相关语法说明

接下来,就上面的例子,对协程相关语法进行说明。

async def do_washing()

使用async def语法定义协程函数do_washing

协程函数示例:

async def func(param1, param2):
    do_stuff()
    await some_coroutine()

注意:

  1. 使用async def语法定义的函数始终是协程函数,即使它们不包含waitasync关键字。

  2. 采用传统的函数调用方式,直接调用协程函数,函数不会被立即执行,会产生类似RuntimeWarning: coroutine \'xxxx协程函数\' was never awaited的告警日志,并返回一个协程对象。仅运行事件循环时才会运行协程。

  3. await 挂起当前协程以等待一个可等待(awaitable)对象--协程函数或者实现了__await__()的对象,直到可等待对象返回结果。可以将这个可等待对象,简单的理解为待执行的异步任务(一般是比较耗时的任务,比如开篇示例中用作比拟的煲饭)。

    注意:

    1. await只能在协程函数内部使用。

    2. 程序遇到await关键词时,会将程序控制权交给主程序,由主程序分配给其它协程。当可等待对象返回结果,并且此时程序控制权还被其它协程占用时,则被挂起的协程依旧无法继续往下运行,直到获取程序控制权。关于这个结论,可用下述示例代码进行验证:

      from datetime import datetime
      import asyncio
      
      async def do_washing():
          print(datetime.now(),\':开始洗衣服\')
          await asyncio.sleep(0.5)
          for i in range(10000):
              if i % 4000 == 0:
                  print(\'洗衣服\')
          print(datetime.now(),\':衣服洗好了\')
      
      async def do_cooking():
          print(datetime.now(), \':开始煲饭\')
          for i in range(100000):
              if i%20000 == 0:
                  print(\'煲饭\')
          await asyncio.sleep(5)
          print(datetime.now(), \':饭煲好了\')
      
      tasks = [
          do_cooking(),
          do_washing()
      ]
      
      if __name__ == \'__main__\':
          loop = asyncio.get_event_loop()
          loop.run_until_complete(asyncio.wait(tasks))
          loop.close()
      

      输出:

      2023-04-10 23:53:37.804727 :开始洗衣服
      2023-04-10 23:53:37.804727 :开始煲饭
      煲饭
      煲饭
      煲饭
      煲饭
      煲饭
      洗衣服
      洗衣服
      洗衣服
      2023-04-10 23:53:38.310586 :衣服洗好了
      2023-04-10 23:53:42.811876 :饭煲好了
      
asyncio.sleep(2)

给定秒数后完成的协程--阻塞指定的秒数。sleep函数还可以指定result参数,协程完成时将该参数值返回给调用者(默认返回None),如下:

result = await asyncio.sleep(0.5, result=\'task done\')
print(result) # 输出:task done

sleep总是会挂起当前任务,以允许其他任务运行。可以利用这个特性,将秒数设置为0,即asyncio.sleep(0),以便提供一个经优化的路径以允许其他任务运行。 这可供长时间运行的函数使用,避免调用该函数时阻塞事件循环。

asyncio.get_event_loop()

为当前上下文获取事件循环(event loop),返回一个实现了AbstractEventLoop接口的事件循环对象。如果没有为当前上下文设置任何事件循环,且当前策略没有指定创建一个事件循环,则抛出异常。必须返回非None值。

AbstractEventLoop.run_until_complete(asyncio.wait(tasks))

运行直到asyncio.wait(tasks)运行完成。返回asyncio.wait(tasks)的运行结果,或者抛出异常。

asyncio.run(coro, *, debug=False)

执行协程 coro 并返回结果。

此函数会运行传入的协程,负责管理 asyncio 事件循环,终结异步生成器,并关闭线程池。

当有其他 asyncio 事件循环在同一线程中运行时,此函数不能被调用。

如果debugTrue,事件循环将以调试模式运行。

此函数总是会创建一个新的事件循环并在结束时关闭之。它应当被用作 asyncio 程序的主入口点,理想情况下应当只被调用一次。

示例:

async def main():
    await asyncio.sleep(1)
    print(\'hello\')

asyncio.run(main())

3.7 新版功能.

asyncio.wait(tasks)

具备完整参数列表的wait函数定义如下

asyncio.wait(fs, *, loop=None, timeout=None, return_when=ALL_COMPLETED)

并发地运行 fs可迭代对象中的可等待对象,并进入阻塞状态直到满足return_when参数所指定的条件(缺省参值为ALL_COMPLETED)。

注意,aws参数不能为空。

函数返回 Future 集合: (done, pending)

请注意,此函数不会引发 asyncio.TimeoutError。当超时发生时,未完成的 Future 将在指定秒数后被返回。

return_when 指定此函数应在何时返回,可选值如下:

  • FIRST_COMPLETED

    函数将在任意可等待对象结束或取消时返回。

  • FIRST_EXCEPTION

    函数将在任意可等待对象因引发异常而结束时返回。当没有引发任何异常时它就相当于ALL_COMPLETED

  • ALL_COMPLETED

    函数将在所有可等待对象结束或取消时返回。

其它协程示例

示例:Hello world携程

import asyncio

async def hello_world():
    print("Hello World!")
    
    return \'hello world\'

# print(hello_world()) # RuntimeWarning: coroutine \'hello_world\' was never awaited #<coroutine object compute at 0x000001B6265F08E0> 

loop = asyncio.get_event_loop()
# Blocking call which returns when the hello_world() coroutine is done
res = loop.run_until_complete(hello_world()) # 把协程对象传递给事件循环
print(res) # 输出:hello world
loop.close()

python3.7版本,也可以使用新API asyncio.run来简化代码

import asyncio

async def hello_world():
    print("Hello World!")
    
    return \'hello world\'
    
asyncio.run(hello_world())

示例:显示当前日期

使用sleep()函数在5秒内每1秒显示一次当前日期的协程示例

import asyncio
import datetime

async def display_date(loop):
    end_time = loop.time() + 5.0
    while True:
        print(datetime.datetime.now())
        if (loop.time() + 1.0) >= end_time:
            break
        await asyncio.sleep(1)

loop = asyncio.get_event_loop()
# Blocking call which returns when the display_date() coroutine is done
loop.run_until_complete(display_date(loop))
loop.close()

示例: 链式协程(Chain coroutines)

import asyncio

async def compute(x, y):
    print("Compute %s + %s ..." % (x, y))
    await asyncio.sleep(1.0)
    return x + y

async def print_sum(x, y):
    result = await compute(x, y)
    print("%s + %s = %s" % (x, y, result))

loop = asyncio.get_event_loop()
loop.run_until_complete(print_sum(1, 2))
loop.close()

compute()被链接到print_sum()print_sum()协程等待compute()完成后再返回结果

示例的序列图

“Task”是由AbstractEventLoop.run_until_complete()方法在获取协程对象而不是任务时创建的。

该图显示了控制流程,但并没有确切描述事物内部是如何工作的。例如,sleep协程创建了一个内部future,它使用AbstractEventLoop.call_later()在1秒内唤醒任务。

可等待对象

整体而言,python协程的可等待对象包含协程函数或者实现了__await__()的对象,常见的可等待对象包含以下几种:

  1. 使用async def定义的协程函数

  2. Task对象,比如使用 asyncio.create_task()asyncio.ensure_future() 创建的任务对象。

  3. Future对象,比如使用 asyncio.Future() 创建的对象。

Future

Future,是对协程的封装,代表一个异步操作的最终结果--将来执行或没有执行的任务的结果,其值会在将来被计算出来。

class asyncio.Future(*, loop=None)

该类基本兼容concurrent.futures.Future

差别:

  • result()exception()不接受超时参数,并且在future尚未完成时引发异常。
  • 总是通过事件循环的call_soon_threadsafe()调用使用add_done_callback()注册的回调。
  • 该类与concurrent.futures包中的wait()as_completed()函数不兼容。

该类不是线程安全的。

类方法

  • cancel()
    取消future并安排执行回调

    如果future已经完成或者取消,则返回False。否则,修改future的状态为已取消,并安排执行回调,并返回True

  • cancelled()

    如果future已取消则返回True

  • done()
    如果future已完成则返回True

    已完成意味着可获取结果或者异常,或者future已被取消。

  • result()

    返回future呈现的结果。

    如果future已被取消,则引发CancelledError。如果future的结果还不可获取,则会引发InvalidStateError。如果future已完成并且存在异常,则该异常会被抛出。

  • exception()
    返回给future设置的异常。

    只有在future完成时,才会返回异常(如果未设置异常,则返回None)。如果future已被取消,则引发CancelledError。如果future尚未完成,则会引发InvalidStateError

  • add_done_callback(fn)

    添加一个回调,以便在future完成时运行。

    使用一个future对象作为参数调用回调。如果调用时,future已经完成,则使用call_soon()调用回调。

    使用functools.partial将参数传递给回调。例如

    fut.add_done_callback(functools.partial(print, "Future:", flush=True)) 将调用print("Future:", fut, flush=True)

  • remove_done_callback(fn)

​ 从“call when done”列表中删除回调的所有实例。

​ 返回已删除的回调数。

  • set_result(result)
    标记future为已完成并设置其结果。

    如果调用此方法时future已完成,则会引发InvalidStateError

  • set_exception(exception)

    标记future为已完成并设置一个异常。

    如果调用此方法时future已完成,则会引发InvalidStateError

例子: Future配合run_until_complete()的使用

import asyncio

async def slow_operation(future):
    await asyncio.sleep(1)
    future.set_result(\'Future is done!\')

loop = asyncio.get_event_loop()
future = asyncio.Future()
asyncio.ensure_future(slow_operation(future))
loop.run_until_complete(future)
print(future.result()) # Future is done!
loop.close()

协程函数负责计算(耗时1秒),并将结果存储到future。run_until_complete()方法等待future的完成。

注意:
run_until_complete() 方法在内部使用add_done_callback()方法,以便在future完成时得到通知。

Future类封装了可调用对象的异步执行

示例:Future配合run_forever()的使用

可以使用Future.add_done_callback()方法以不同的方式编写前面的示例,以明确描述控制流:

import asyncio

async def slow_operation(future):
    await asyncio.sleep(1)
    future.set_result(\'Future is done!\')

def got_result(future):
    print(future.result())
    loop.stop()

loop = asyncio.get_event_loop()
future = asyncio.Future()
asyncio.ensure_future(slow_operation(future))
future.add_done_callback(got_result)
try:
    loop.run_forever()
finally:
    loop.close()

在本例中,future用于将slow_operation()链接到got_result():当slow_ooperation()完成时,将调用got_resull()获取结果

Task

class asyncio.Task(coro, *, loop=None)
安排协程的执行:将其封装在future。Task是Future的一个子类。

task负责在事件循环中执行协程。如果封装的协程由future生成,则task将阻塞执行封装的协程并等待future的完成。当future完成并返回结果或者异常,封装的协程的执行将重新开始,并检索future的结果或异常。

事件循环使用协作调度:一个事件循环一次只运行一个task。如果其他事件循环在不同的线程中运行,则其他task可以并行运行。当task等待future完成时,事件循环会执行一个新task。

取消一项task和取消一个future是不同的。调用cancel()将向封装的协程抛出CancelledError。仅当封装的协程没有捕获CancelledError异常或抛出CancelledError异常时,cancelled()才会返回True

如果一个挂起的task被销毁,则其封装的协程不会被执行完。这可能是一个bug,并记录一条警告:

Task was destroyed but it is pending!
task: <Task pending coro=<kill_me() done, defined at test.py:5> wait_for=<Future pending cb=[Task._wakeup()]>>

不要直接创建Task实例:使用ensure_future()函数或AbstractEventLoop.create_task()方法。

这个类不是线程安全的。

类方法

  • all_tasks(loop=None)
    返回给定事件循环的所有任务集。默认返回当前事件循环的所有任务。

  • current_task(loop=None)
    返回给定事件循环中当前正在运行的任务。默认返回当前事件循环中的当前任务。

    不在Task上下文中调用该函数时返回None

  • cancel()
    请求取消任务

    安排在事件循环的下一个循环中将CancelledError抛出到封装的协程中。然后,协程有机会使用try/except/finally清理甚至拒绝请求。

    Future.cancel()不同,这并不能保证task会被取消:异常可能会被捕获并采取行动,从而延迟task的取消或完全阻止取消。该task也可能返回一个值或抛出一个不同的异常。

    调用此方法后,cancelled()将不会立即返回True(除非任务已被取消)。当封装的协程以CancelledError异常终止时,task将被标记为已取消(即使未调用cancel())。

  • get_stack(*, limit=None)

    返回此任务的协程的堆栈帧列表。

    如果协程没有完成,则返回它被挂起的堆栈。如果协同程序已成功完成或被取消,则返回一个空列表。如果协同程序被异常终止,则返回traceback帧列表。

    堆栈帧总是按从旧到新的顺序排列。

    可选limit给出了要返回的最大帧数;默认情况下,将返回所有可获取的帧。它的含义因返回堆栈还是trackback而不同:返回堆栈的最新帧,但返回traceback的最旧帧(这与traceback模块的行为相符)。

    由于我们无法控制的原因,对于挂起的协程,只返回一个堆栈帧。

  • print_stack(*, limit=None, file=None)

    打印此任务的协程的堆栈或traceback。

    get_stack()检索的帧生成类似于traceback模块的输出。limit参数被传递给get_stack()file参数为I/O流,输出将写入该流;默认情况下,输出写入sys.stderr

示例:并行执行task

并行执行3个task (A, B, C)

import asyncio

async def factorial(name, number):
    f = 1
    for i in range(2, number+1):
        print("Task %s: Compute factorial(%s)..." % (name, i))
        await asyncio.sleep(1)
        f *= i
    print("Task %s: factorial(%s) = %s" % (name, number, f))

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.gather(
    factorial("A", 2),
    factorial("B", 3),
    factorial("C", 4),
))
loop.close()

输出:

Task B: Compute factorial(2)...
Task C: Compute factorial(2)...
Task A: Compute factorial(2)...
Task B: Compute factorial(3)...
Task C: Compute factorial(3)...
Task A: factorial(2) = 2
Task B: factorial(3) = 6
Task C: Compute factorial(4)...
Task C: factorial(4) = 24

task在创建时会自动被安排执行。事件循环将在所有task完成后停止。

Task函数

注意:

在下面的函数中,可选的循环参数允许显式设置底层task或协程使用的事件循环对象。如果没有提供,则使用默认的事件循环

  • asyncio.as_completed(fs, *, loop=None, timeout=None)

    返回一个迭代器,该迭代器在等待时为Future实例。

    如果在所有Future完成之前发生超时,则引发asyncio.TimeoutError

    示例:

    for f in as_completed(fs):
        result = yield from f  # The \'yield from\' may raise
        # Use result
    

    注意:
    future f不一定是fs的成员

  • asyncio.ensure_future(coro_or_future, *, loop=None)

    安排协程对象的执行:在其封装在Future中。返回一个Task对象。

    如果参数是Future,则直接返回。

    版本3.4.4中新增

    版本3.5.1变更: 函数接受任何可等待对象。

  • asyncio.async(coro_or_future, *, loop=None)
    废弃的ensure_future()的别名

    版本 3.4.4开始废弃

  • asyncio.wrap_future(future, *, loop=None)

    concurrent.futures.Future对象封装在Future对象中。

  • asyncio.gather(*coros_or_futures, loop=None, return_exceptions=False)

    返回来自给定协程对象或future的future聚合结果。

    所有future必须共享相同的事件循环。如果所有task都成功完成,那么返回的future结果就是结果列表(按照原始序列的顺序,不一定是结果到达的顺序)。如果return_exceptions为true,则task中的异常将被视为成功的结果,并收集在结果列表中;否则,第一个抛出的异常将立即传递给返回的future。

    取消:如果外部Future被取消,则所有子项(尚未完成)也将被取消。如果任何子项被取消,这将被视为引发CancelledError错误——在这种情况下,外部Future不会被取消。(这是为了防止取消一个子项而导致其他子项被取消。)

  • asyncio.iscoroutine(obj)
    如果obj是一个协程对象,该对象可能基于生成器或async def协程,则返回True

  • asyncio.iscoroutinefunction(func)
    如果func被判断为协程函数,则返回True,协程函数可以是被修饰的生成器函数或async def函数。

  • asyncio.run_coroutine_threadsafe(coro, loop)

    向给定的事件循环提交一个协程对象。

    返回concurrent.futures.Future以访问结果。

    该函数被从不同于运行事件循环线程的线程调用。用法:

    # Create a coroutine
    coro = asyncio.sleep(1, result=3)
    
    # Submit the coroutine to a given loop
    future = asyncio.run_coroutine_threadsafe(coro, loop)
    
    # Wait for the result with an optional timeout argument
    assert future.result(timeout) == 3
    

    如果在协程中引发异常,则会通知返回的future。它还可以用于取消事件循环中的task:

    try:
        result = future.result(timeout)
    except asyncio.TimeoutError:
        print(\'The coroutine took too long, cancelling the task...\')
        future.cancel()
    except Exception as exc:
        print(\'The coroutine raised an exception: !r\'.format(exc))
    else:
        print(\'The coroutine returned: !r\'.format(result))
    

    注意:
    与模块中的其他函数不同,run_coroutine_threadsafe() 要求显式传递loop参数。

    版本3.5.1中新增

  • coroutine asyncio.sleep(delay, result=None, *, loop=None)

    创建一个给定秒数后完成的协程--阻塞指定的秒数。sleep函数还可以指定result参数,协程完成时将该参数值返回给调用者(默认返回None

  • asyncio.shield(arg, *, loop=None)
    等待future,保护它不被取消。

    语句:

    res = yield from shield(something())
    

    等价于:

    res = yield from something()
    

    除非包含它的协程被取消,否则在something()中运行的任务不会被取消。从something()的视角来看,并没法生取消。但是它的调用者仍然被取消,所以yield from表达式仍然会引发CancelledError。注意:如果通过其他方式取消了something(),这仍然会取消shield()

    如果你想完全忽略取消(cancellation,不推荐),你可以将shield()try/except子句结合使用,如下所示:

    try:
        res = yield from shield(something())
    except CancelledError:
        res = None
    
  • coroutine asyncio.wait(futures, *, loop=None, timeout=None, return_when=ALL_COMPLETED)

    等待futures序列参数给定的Future和协程对象执行完成。协程将被封装在task中。返回两个Future集:(done,pending)

    • futures序列参数不能为空。

    • timeout参数可用于控制返回前等待的最大秒数。timeout可以是intfloat类型。如果未指定timeout参数或参数值为空,则没有等待时间限制,即永不超时。

    • return_when指示此函数何时返回。它必须是concurrent.futures模块的以下常量之一:

      • FIRST_COMPLETED 当任何future完成或被取消时,函数将返回。
      • FIRST_EXCEPTION 当任何future因为引发异常而结束时,函数将返回。如果没有future引发异常,那么它相当于ALL_COMPLETED
      • ALL_COMPLETED当所有future结束或被取消时,函数将返回。

    这个函数是一个协程。

    用法:

    done, pending = yield from asyncio.wait(fs)
    

    注意
    这不会引发asyncio.TimeoutErrorpending集合中存放的是发生超时时未完成的future。

  • coroutine asyncio.wait_for(fut, timeout, *, loop=None)

    等待单个future或协程对象完成直到发生超时(如果超时限制的话)。如果timeoutNone,则一直等待直到future完成。

    协程将被封装在Task中。

    函数返回Future或协同程序的结果。当发生超时时,将取消task并抛出asyncio.TimeoutError。为了避免任务取消,请将其封装在shield()中。

    如果取消wait,那么future fut也将被取消。

    该函数为一个协程,用法:

    result = yield from asyncio.wait_for(fut, 60.0)
    

参考连接:

https://www.shuzhiduo.com/A/gGdXlLmmd4/

Python学习:python并发编程之协程

本节内容:

  1.协程介绍。

  2.回顾yield

  3.Greenlet

  4.Gevent介绍

  5.Gevent之同步与异步

  6.Gevent之应用举例一

  7.Gevent应用举例二


一.引子

    本节的主题是基于单线程来实现并发,即只用一个主线程(很明显可利用的cpu只有一个)情况下实现并发,为此我们需要先回顾下并发的本质:切换+保存状态

    cpu正在运行一个任务,会在两种情况下切走去执行其他的任务(切换由操作系统强制控制),一种情况是该任务发生了阻塞,另外一种情况是该任务计算的时间过长

技术分享

ps:在介绍进程理论时,提及进程的三种执行状态,而线程才是执行单位,所以也可以将上图理解为线程的三种状态 

    其中第二种情况并不能提升效率,只是为了让cpu能够雨露均沾,实现看起来所有任务都被“同时”执行的效果,如果多个任务都是纯计算的,这种切换反而会降低效率。为此我们可以基于yield来验证。yield本身就是一种在单线程下可以保存任务运行状态的方法,我们来简单复习一下:

#1 yiled可以保存状态,yield的状态保存与操作系统的保存线程状态很像,但是yield是代码级别控制的,更轻量级
#2 send可以把一个函数的结果传给另外一个函数,以此实现单线程内程序之间的切换  
技术分享
#串行执行
import time
def consumer(res):
    ‘‘‘任务1:接收数据,处理数据‘‘‘
    pass

def producer():
    ‘‘‘任务2:生产数据‘‘‘
    res=[]
    for i in range(10000000):
        res.append(i)
    return res

start=time.time()
#串行执行
res=producer()
consumer(res)
stop=time.time()
print(stop-start) #1.5536692142486572



#基于yield并发执行
import time
def consumer():
    ‘‘‘任务1:接收数据,处理数据‘‘‘
    while True:
        x=yield

def producer():
    ‘‘‘任务2:生产数据‘‘‘
    g=consumer()
    next(g)
    for i in range(10000000):
        g.send(i)

start=time.time()
#基于yield保存状态,实现两个任务直接来回切换,即并发的效果
#PS:如果每个任务中都加上打印,那么明显地看到两个任务的打印是你一次我一次,即并发执行的.
producer()

stop=time.time()
print(stop-start) #2.0272178649902344
单纯地切换反而会降低运行效率

 


以上是关于Python asyncio之协程学习总结的主要内容,如果未能解决你的问题,请参考以下文章

高性能编程之协程--------asyncio

Python学习:python并发编程之协程

第十二篇:多任务之协程

13.并发编程之协程

python之协程

Java之协程(quasar)