Python 协程(Coroutine)体验

Posted backofhan

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python 协程(Coroutine)体验相关的知识,希望对你有一定的参考价值。

概述

本文通过运行一段Python小程序,模拟一个真实的任务。比较在多线程(Multi-thread)和在多协程(Coroutine)环境下的编程实现。发现和解释一些有趣的现象。以期为大家带来一些对协程的直观感受,加深对这种新鲜事物的理解。

Python中的协程

协程(coroutine)是一个有很长历史的概念,它是计算机程序的一类组件,推广了协作式多任务的子程序。其详细的概念和历史请参照维基百科中的条目:https://en.wikipedia.org/wiki...
Python天生支持的生成器(generator)其实就是协程的一种实现,生成器允许执行被挂起与被恢复。但是由于缺乏更多语法上的支持,以及缺乏利用生成器实现异步编程的成熟模式,限制了生成器作为协程参与协作式多任务编程的用途。不过现在情况发生了改变,Python自3.6版本开始添加了async/await的语法直接支持协程的异步编程,同时在asyncio库中提供了协程编程的接口以及必要的基础实现。社区也在不断努力为现有的IO库提供异步的版本以便用于协程开发环境,例如http client目前至少在aiohttp以及tornado中都提供了可用于协程的异步版本。我们知道IO操作天生是异步的,为了适应广泛应用的传统的同步编程模式,很多的IO库都采用阻塞调用者的方式来实现同步。这样虽然简化了编程,可也带来了并行度不高的问题。在一些有大量耗时IO操作的环境里,高层应用不得不忍受串行操作造成的漫长等待,或是转向多进程(Multi-Process)多线程编程以期提高并行程度。而多进程多线程编程又会引入争用、通讯,同步、保护等棘手的问题。而且我们知道即使是作为轻量级的线程也会对应一个独立的运行栈。线程的调度和切换不可避免地包括运行栈的切换和加载。如果在一个进程中有成百上千的线程,那么相应的调度开销会急剧上升到难以忍受的程度。而且线程之间的同步和互锁也将成为一个噩梦。除去boss级别的死锁问题,其他任何的bug或是缺陷在多线程环境下都难于重现和追踪,这是因为线程的调度有很大的随机性。

一个Python小程序

下面是一个Python的小程序,可以在Python3.8或者更新的版本上运行。

import threading
import time
import asyncio


def gen():
    s = 0
    while s < 1000:
        yield s
        s += 1


def unsafe_thread_worker(g):
    t = 0
    try: 
        while True:
            v = next(g)
            time.sleep(0.01)
            t += v
    except StopIteration:
        print(f" {t} ")


async def wrong_coroutine_worker(g):
    t = 0
    try: 
        while True:
            v = next(g)
            time.sleep(0.01)
            t += v
    except StopIteration:
        print(f" {t} ")
        

async def starter_with_wrong_workers():
    tasks = []
    for _ in range(10):
        task = asyncio.create_task(wrong_coroutine_worker(g))
        tasks.append(task)
    await asyncio.gather(*tasks)
    

async def right_coroutine_worker(g):
    t = 0
    try: 
        while True:
            v = next(g)
            await asyncio.sleep(0.01)
            t += v
    except StopIteration:
        print(f" {t} ")
        

async def starter_with_right_workers():
    tasks = []
    for _ in range(10):
        task = asyncio.create_task(right_coroutine_worker(g))
        tasks.append(task)
    await asyncio.gather(*tasks)


if __name__ == \'__main__\':
    
    print(\'----------------- Sequence  -----------------\')
    g = gen()
    started_at = time.monotonic()
    t = 0
    for v in g:
        time.sleep(0.01)
        t += v
    print(t)
    total_time = time.monotonic() - started_at
    print(f\'total time consumed: {total_time:.2f} seconds\')
    
    print(\'----------------- Unsafe threading  -----------------\')
    g = gen()
    started_at = time.monotonic()
    threads =[]
    for _ in range(10):
        w = threading.Thread(target=unsafe_thread_worker, args=[g])
        w.start()
        threads.append(w)
    for w in threads:
        w.join()
    total_time = time.monotonic() - started_at
    print(f\'total time consumed: {total_time:.2f} seconds\')

    print(\'----------------- Async with wrong coroutine  -----------------\')
    g = gen()
    started_at = time.monotonic()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(starter_with_wrong_workers())
    total_time = time.monotonic() - started_at
    print(f\'total time consumed: {total_time:.2f} seconds\')
            
    print(\'----------------- Async with right coroutine  -----------------\')
    g = gen()
    started_at = time.monotonic()
    loop = asyncio.get_event_loop()
    loop.run_until_complete(starter_with_right_workers())
    total_time = time.monotonic() - started_at
    print(f\'total time consumed: {total_time:.2f} seconds\')

一个典型的运行输出看起来像是这个样子的:

----------------- Sequence  -----------------
499500
total time consumed: 10.53 seconds
----------------- Unsafe threading  -----------------
 49804  49609 

 50033 
 49682 
 49574 
 50005 
 50143 
 50069  50219 

 50362 
total time consumed: 1.09 seconds
----------------- Async with wrong coroutine  -----------------
 499500 
 0 
 0 
 0 
 0 
 0 
 0 
 0 
 0 
 0 
total time consumed: 10.55 seconds
----------------- Async with right coroutine  -----------------
 49500 
 49967 
 49973 
 50100 
 49965 
 50000 
 49968 
 49963 
 49964 
 50100 
total time consumed: 1.16 seconds

这个小程序实际上模拟了一个常见的真实任务。设想我们通过一个http的数据API一页一页地获取一个比较大的数据集。每页数据通过一个带有页号或是起始位置的URL予以标识,然后通过向API服务器发送一个http request,并解析返回的http response中所包含的数据。其中的http访问显然是一个耗时的IO操作。返回数据的解析和处理是一个计算密集型的操作,相比IO等待,其消耗的时间不值一提。那个生成器gen可以看作是一个数据页面URL的生成器,也就是任务生成器。然后我们使用sleep来模拟一个耗时的IO操作,使用加法来模拟数据的合并与分析。你也可以把这个小程序想象成为一个网络爬虫,我们在一个全局的列表里保存了所有目标网站的地址,然后或串行或并行地访问所有的目标,取回我们感兴趣的数据存储并合并分析。
总之,我们有1000个比较独立的小任务。由于任务之间没有依赖性,所以多个任务是可以并行执行的。每个任务又分为领取并明确任务,获取数据(这是一个耗时0.01秒的IO操作),返回数据的存储和处理几个步骤。在一个任务内部,各个步骤间均有强烈依赖,不能并行执行。
现在让我们来看看主函数,其代码分为4段,分别对应了4种不同的实现方法。方法一是最为传统的串行方式,通过一个简单的循环,一个一个地获取并完成任务,在一个任务完成后再领取下一个任务。不出所料,由于IO操作是主要的耗时操作,串行执行的时间等于每个任务耗时的总和,0.01*1000 = 10秒。方法二使用了多线程,模拟了一个有10个线程的线程池,池中的每个线程均独立地像方法一那样工作。由于所有的线程都是并行运行的,所以总的耗时几乎是串行方法的1/10。方法三使用了协程,也是模拟了一个有10个协程的协程池,可是由于使用了错误的IO操作,导致多个协程事实上不能并行执行,其总的耗时和方法一相当,我们稍后会仔细分析比较。方法四修正了方法三的错误,使得协程能够并行运行,其总耗时与方法二相当。
现在来看看输出的结果。方法一输出499500,为0到999一千个数的和(sum(range(1000)) == 499500)。方法二的输出比较乱,10个数没有分别输出到10行上,这是因为标准输出是一个在线程间共享的资源,而print()方法不是线程安全的,从输出来看,在打印出一个数,和其后打印一个换行符之间,线程很可能发生切换,导致输出混乱。其实那个分发任务的生成器gen也不是线程安全的,如果它的调用比较耗时,在线程间发生争用,就会抛出ValueError: generator already executing。只是在本例中,gen的调用很快,不太能遇到争用的情况。如果我们在yield s前加一行打印的语句,不论打印什么,都会立即看到上述异常,读者可以自行测试。方法三的输出明确显示了只有第一个协程在工作,它完成了所有的任务,并且以完全串行的方式运行。当其他协程终于得以运行的时候,已经没有剩下的任务了。这是由于那个IO语句time.sleep()是一个同步的IO操作,它不会导致当前协程被挂起从而让别的协程得到运行机会,相反的,同步的IO直接阻塞(Block)当前线程,导致所有在当前线程内的协程均无法获得运行机会。方法四只是将同步IO替换为异步的asyncio.sleep()(也是使用协程实现的)就解决了协程并行的问题。
让我们把注意力集中到代码中与协程相关的部分,来深入地了解一下协程本身。首先在代码里所有由async def定义的函数都是协程函数,这是一个语法糖,其实质就是一个generator函数。同generator函数一样,直接的函数调用不会导致其代码被运行,而是生成了一个协程的实例对象(就是一个generator实例)要跑动代码必须通过与generator同样的方法,调用实例的send(msg)或是next()(next等同于send(None))方法。我们知道generator通过yield退出并挂起,等待下一次的send或next调用激活并继续运行。async def定义的协程中的return或是await都可以理解为一个yield。其中return返回的值将作为本次send或next调用的返回值,生成一个消息放到消息循环队列里(event queue),该消息被消息循环(event loop)处理的时候将会激活另一个等待(await)该消息的协程。之后return的协程将控制交还消息循环,并且从此不会再有关于return协程的消息加入到消息循环队列,于是该协程实例不会再有运行的机会,将被系统在适当的时候回收。await是async-wait的意思,它后边必须跟一个协程方法的调用,也就是生成了一个协程的实列(参见前面的说明)。这实际上就是在消息队列里放入了一个被await协程的启动消息,然后通过yield将控制权交还到消息循环,并且声明了该被await的协程结束的消息将被用于重新激活await的协程。另外有些东西值得注意:

  • 消息队列和消息循环在协程调度中处于中心位置。它们是属于一个线程的。一个线程不能拥有多个消息循环。
  • 协程的进入和退出使用消息循环驱动,而传统的方法调用是在运行栈顶上添加一个调用帧。由于栈的特性,传统的方法调用只能以同步的方式运行。而协程适合于异步的场景。
  • 协程的控制权不会被剥夺,必须由当前运行的协程主动交还到消息循环。方法是通过await别的协程或是运行完后返回。有建议在协程中,如果需要做耗时的计算,最好在过程中放入一些await asyncio.sleep(0)。这会将控制权立即交回到消息循环,并且由sleep将再度激活的消息放入到消息队列的尾端。从而给其他协程一个运行的机会,之后返回并激活本协程继续运行。例如在方法三中,如果认为time.sleep()是计算密集的操作,在其前面或是后面添加await asyncio.sleep(0)都会使其他协程加入到工作中,虽然在本例中不会节约运行时间。
  • 协程一旦运行阻塞的操作,将会阻塞整个线程,线程内的所有其他协程均不能获得运行机会。因为此时消息循环也不能获得运行机会。这就看出使用协程方式工作的异步IO库的重要性了。
  • 由于在一个线程内的多个协程是被轮流调度的,不会有多个协程同时运行,因此协程间的通信和共享不会发生争用的现象,协程相关的代码不需要考虑同步或保护。从而在很大程度上能够获得类似于多线程,多进程并行的性能提升,同时又避免了由并行引发的诸多问题。
  • 协程的应用本质上是为了避免等待IO时CPU空闲而造成的资源、时间的浪费。如果没有耗时的IO操作,CPU全程都忙于计算,那么使用协程将不会带来性能的提升。反而会由于消息循环的开销而使性能略微下降。
  • 协程没有专属的运行栈,其调度切换相对线程要轻量级很多,所以协程有可能被大规模部署并运行。但是这并不意味着没有任何限制。我们知道一个协程实际上是一个对象实例,大规模不受限制的部署将有可能耗尽内存等资源。类似于本例中的协程池的概念可以解决这个问题。或者可以使用asyncio中提供的同步原语(Synchronization Primitives)来限制协程生成和激活的数量。

以上是关于Python 协程(Coroutine)体验的主要内容,如果未能解决你的问题,请参考以下文章

操作系统OS,Python - 协程(Coroutine)

转载 Unity3D协同程序(Coroutine)

使用 python 线程时出现协程错误,RuntimeWarning: coroutine 'time_messege' 从未等待

Python asyncio run_coroutine_threadsafe 从不运行协程?

Python 协程 - Coroutines

Python协程