详解Python中的协程,为啥说它的底层是生成器?

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了详解Python中的协程,为啥说它的底层是生成器?相关的知识,希望对你有一定的参考价值。

参考技术A

协程又称为是微线程,英文名是Coroutine。它和线程一样可以调度,但是不同的是线程的启动和调度需要通过操作系统来处理。并且线程的启动和销毁需要涉及一些操作系统的变量申请和销毁处理,需要的时间比较长。而协程呢,它的调度和销毁都是程序自己来控制的,因此它更加轻量级也更加灵活。

协程有这么多优点,自然也会有一些缺点,其中最大的缺点就是需要编程语言自己支持,否则的话需要开发者自己通过一些方法来实现协程。对于大部分语言来说,都不支持这一机制。go语言由于天然支持协程,并且支持得非常好,使得它广受好评,短短几年时间就迅速流行起来。

对于Python来说,本身就有着一个GIL这个巨大的先天问题。GIL是Python的全局锁,在它的限制下一个Python进程同一时间只能同时执行一个线程,即使是在多核心的机器当中。这就大大影响了Python的性能,尤其是在CPU密集型的工作上。所以为了提升Python的性能,很多开发者想出了使用多进程+协程的方式。一开始是开发者自行实现的,后来在Python3.4的版本当中,官方也收入了这个功能,因此目前可以光明正大地说,Python是支持协程的语言了。

生成器(generator)

生成器我们也在之前的文章当中介绍过,为什么我们介绍协程需要用到生成器呢,是因为Python的协程底层就是通过生成器来实现的。

通过生成器来实现协程的原因也很简单,我们都知道协程需要切换挂起,而生成器当中有一个yield关键字,刚好可以实现这个功能。所以当初那些自己在Python当中开发协程功能的程序员都是通过生成器来实现的,我们想要理解Python当中协程的运用,就必须从最原始的生成器开始。

生成器我们很熟悉了,本质上就是带有yield这个关键词的函数。

def test():

    n = 0

    while n < 10:

        val = yield n 

        print('val = '.format(val))

        n += 1

这个函数当中如果没有yield这个语句,那么它就是一个普通的Python函数。加上了val = yield n这个语句之后,它有什么变化呢?

我们尝试着运行一下:

# 调用test函数获得一个生成器

g = test()

print(next(g))

print(next(g))

print(next(g))

得到这么一个结果:

输出的0,1,2很好理解,就是通过next(g)返回的,这个也是生成器的标准用法。奇怪的是为什么val=None呢?val不应该等于n么?

这里想不明白是正常的,因为这里涉及到了一个新的用法就是生成器的send方法。当我们在yield语句之前加上变量名的时候,它的含义其实是返回yield之后的内容,再从外界接收一个变量。也就是说当我们执行next(g)的时候,会从获取yield之后的数,当我们执行g.send()时,传入的值会被赋值给yield之前的数。比如我们把执行的代码改成这样:

g = test()

print(next(g))

g.send('abc')

print(next(g))

print(next(g))

我们再来看执行的结果,会发现是这样的:

第一行val不再是None,而是我们刚刚传入的abc了。

队列调度

生成器每次在执行到yield语句之后都会自然挂起,我们可以利用这一点来当做协程来调度。我们可以自己实现一个简易的队列来模拟这个过程。

首先我们声明一个双端队列,每次从队列左边头部获取任务,调度执行到挂起之后,放入到队列末尾。相当于我们用循环的方式轮询执行了所有任务,并且这整个全程不涉及任何线程创建和销毁的过程。

class Scheduler:

    def __init__(self):

        self._queue = deque()

    def new_task(self, task):

        self._queue.append(task)

    def run(self):

        while self._queue:

            # 每次从队列左侧获取task

            task = self._queue.popleft()

            try:

                # 通过next执行之后放入队列右侧

                next(task)

                self._queue.append(task)

            except StopIteration:

                pass

sch = Scheduler()

sch.new_task(test(5))

sch.new_task(test(10))

sch.new_task(test(8))

sch.run()

这个只是一个很简易的调度方法,事实上结合上yield from以及send功能,我们还可以实现出更加复杂的协程调度方式。但是我们也没有必要一一穷尽,只需要理解最基础的方法就可以了,毕竟现在我们使用协程一般也不会自己实现了,都会通过官方原生的工具库来实现。

@asyncio.coroutine

在Python3.4之后的版本当中,我们可以通过@asyncio.coroutine这个注解来将一个函数封装成协程执行的生成器。

在吸收了协程这个概念之后,Python对生成器以及协程做了区分。加上了@asyncio.coroutine注解的函数称为协程函数,我们可以用iscoroutinefunction()方法来判断一个函数是不是协程函数,通过这个协程函数返回的生成器对象称为协程对象,我们可以通过iscoroutine方法来判断一个对象是不是协程对象。

比如我把刚刚写的函数上加上注解之后再来执行这两个函数都会得到True:

import asyncio

@asyncio.coroutine

def test(k):

    n = 0

    while n < k:

        yield

        print('n = '.format(n))

        n += 1

        print(asyncio.iscoroutinefunction(test))

print(asyncio.iscoroutine(test(10)))

那我们通过注解将方法转变成了协程之后,又该怎么使用呢?

一个比较好的方式是通过asynio库当中提供的loop工具,比如我们来看这么一个例子:

loop = asyncio.get_event_loop()

loop.run_until_complete(test(10))

loop.close()

我们通过asyncio.get_event_loop函数创建了一个调度器,通过调度器的run相关的方法来执行一个协程对象。我们可以run_until_complete也可以run_forever,具体怎么执行要看我们实际的使用场景。

async,await和future

从Python3.5版本开始,引入了async,await和future。我们来简单说说它们各自的用途,其中async其实就是@asyncio.coroutine,用途是完全一样的。同样await代替的是yield from,意为等待另外一个协程结束。

我们用这两个一改,上面的代码就成了:

async def test(k):

    n = 0

    while n < k:

        await asyncio.sleep(0.5)

        print('n = '.format(n))

        n += 1

由于我们加上了await,所以每次在打印之前都会等待0.5秒。我们把await换成yield from也是一样的,只不过用await更加直观也更加贴合协程的含义。

Future其实可以看成是一个信号量,我们创建一个全局的future,当一个协程执行完成之后,将结果存入这个future当中。其他的协程可以await future来实现阻塞。我们来看一个例子就明白了:

future = asyncio.Future()

async def test(k):

    n = 0

    while n < k:

        await asyncio.sleep(0.5)

        print('n = '.format(n))

        n += 1

    future.set_result('success')

async def log():

    result = await future

    print(result)

loop = asyncio.get_event_loop()

loop.run_until_complete(asyncio.wait([

    log(),

    test(5)

]))

loop.close()

在这个例子当中我们创建了两个协程,第一个协程是每隔0.5秒print一个数字,在print完成之后把success写入到future当中。第二个协程就是等待future当中的数据,之后print出来。

在loop当中我们要调度执行的不再是一个协程对象了而是两个,所以我们用asyncio当中的wait将这两个对象包起来。只有当wait当中的两个对象执行结束,wait才会结束。loop等待的是wait的结束,而wait等待的是传入其中的协程的结束,这就形成了一个依赖循环,等价于这两个协程对象结束,loop才会结束。

总结

async并不只是可以用在函数上,事实上还有很多其他的用法,比如用在with语句上,用在for循环上等等。这些用法比较小众,细节也很多,就不一一展开了,大家感兴趣的可以自行去了解一下。

不知道大家在读这篇文章的过程当中有没有觉得有些费劲,如果有的话,其实是很正常的。原因也很简单,因为Python原生是不支持协程这个概念的,所以在一开始设计的时候也没有做这方面的准备,是后来觉得有必要才加入的。那么作为后面加入的内容,必然会对原先的很多内容产生影响,尤其是协程借助了之前生成器的概念来实现的,那么必然会有很多耦合不清楚的情况。这也是这一块的语法很乱,对初学者不友好的原因。

深潜Koltin协程:底层中的协程

系列电子书:传送门


底层的协程

有这么一种人,他们不能接受仅仅是只会开车,他们还想要了解引擎盖下面是如何运作了。我就是这样的人,所以我必须弄清楚协程是如何工作的。如果你也是这样的人,你也会喜欢本章的。如果不是的话,你可以跳过本章。

本章不会介绍任何你可能会用到的工具,只是纯粹的阐述。我将尝试在一个令所有人都接受的层级上解释协程是如何工作的。重点如下:

  • 挂起函数就像状态机一样,在函数的开始,以及每次挂起协程后,都会有一个不同状态
  • 标识状态的数字和本地数据都保存在了 continuation 对象中 (注:我们上一章中提到了 Continuation,它是一个概念,也是一个类型。它就像是游戏中的存档点,本章我们将着重讨论它)
  • 一个函数的 Continuation 将它之后的调用行为包装了起来。因此,所有这些 Continuation 表示的是恢复函数的调用栈,

如果你有兴趣了解一些内部结构(当然是简化的),请继续阅读一下内容。

传递 Continuation 的方式

有几种方式可以实现挂起功能,但 Kotlin 团队决定采用一种称为“传递 Continuation”的方式。这意味着 continuation 将作为参数从一个函数传递到另外一个函数。按照惯例,continuation 位于最后一个参数位置:

suspend fun getUser(): User?
suspend fun setUser(user: User)
suspend fun checkAvailability(flight: Flight): Boolean

// 这些函数在底层的样子
fun getUser(continuation: Continuation<*>): Any?
fun setUser(user: User, continuation: Continuation<*>): Any
fun checkAvailability(
    flight: Flight,
    continuation: Continuation<*>
): Any

你可能还注意到,底层函数的返回类型不同于最初的返回类型,它变成了 AnyAny?,为什么会这样呢?原因是挂起函数可能会被挂起,所以这个时候它不能返回被声明的类型。在这种情况下,它返回一个特殊的 COROUTINE_SUSPENDED 标记,稍后我们将在实践中看到。现在,只需要注意,因为 getUser 可能返回 User?COROUTINE_SUSPENDED(其类型为 Any)两种数据,所以该函数返回类型必须是 User?Any 的超类,所以就只能是 Any? 了。 也许有一天 Kotlin 会引入联合类型,在这种情况下我们就可以使用 User? | COROUTINE_SUSPENDED 来代替了。

一个非常简单的函数

为了更深入地理解,让我们从一个非常简单的函数开始。该函数在 delay 前后打印一些东西:

suspend fun myFunction() 
    println("Before")
    delay(1000) // suspending
    println("After")

你已经可以推断出 myFunction 的函数签名在底层是什么样的了:

fun myFunction(continuation: Continuation<*>): Any

接下来,这个函数需要要一个属于它的 continuation 来记住它的状态。让我们将其命名为 MyFunctionContinuation(实际的 continuation 是一个对象表达式,没有名称,但是这样解释会更加容易)。在函数体的开头, MyFuntion 会将自己的 continuation(参数)包装到自己的 continuation (MyFunctionContinuation) 中。

val continuation = MyFunctionContinuation(continuation)

上面的行为只有在 continuation 还没有包装的情况下才会包装。如果已经包装了的话,我们应该保持 continuation 不变,因为那时已经到了函数恢复的过程了(现在可能会让人疑惑,但稍后你会更好地明白这是为什么)。

val continuation =
    if (continuation is MyFunctionContinuation) continuation  // 如果已经包装,就保持不变
    else MyFunctionContinuation(continuation)   // 否则,包装 continuation

这个条件表达式可以简化为:

val continuation = continuation as? MyFunctionContinuation
    ?: MyFunctionContinuation(continuation)

最后,让我们来谈谈函数体:

suspend fun myFunction() 
    println("Before")
    delay(1000) // suspending
    println("After") 

函数可以从两个地方开始:

  1. 开始(在第一次调用的情况下)
  2. 被挂起之后(被 continuation 恢复的情况下)

为了标识当前状态,我们使用了一个名为 label 的字段。初始值是 0,代表函数刚开始调用。然后,函数在执行到每个挂起点之前,label 都会被设置到下一个状态, 由此,我们就可以根据 label 从恢复后的挂起点开始运行函数了。

// myFunction 在底层简化过后的代码
fun myFunction(continuation: Continuation<Unit>): Any 
    val continuation = continuation as? MyFunctionContinuation
        ?: MyFunctionContinuation(continuation)
    
    if (continuation.label == 0) 
        println("Before")
        continuation.label = 1
        if (delay(1000, continuation) == COROUTINE_SUSPENDED) 
            return COROUTINE_SUSPENDED
         
    
    
    if (continuation.label == 1) 
        println("After")
        return Unit
    
    error("Impossible") 

最后一个重点也在上面的代码片段中展示出来。当 delay 函数挂起时,它返回 COROUTINE_SUSPENDED ,然后 myFunction 返回 COROUTINE_SUSPENDED,就像递归一样,调用它的函数,调用该函数的函数…直到顶层调用链,所有的函数都要执行先沟通的操作,即返回这个 COROUTINE_SUSPENDED。这就是挂起函数如何暂停所有这些函数,并留下线程供其他可运行对象(包括协程)继续工作的方式。

让我们继续分析上面的代码,如果这个 delay 没有返回 COROUTINE_SUSPENDED 会发生什么呢?如果它只是返回 Unit (我们知道它不会,但是让我们假设一下)呢?请注意,如果 continuation 只返回了 Unit,我们就会进入到下一个状态判断去,函数的行为和其它函数一样。

现在,让我们讨论 continuation,它被实现成一个匿名类,简化后,它是这样的:

cont = object : ContinuationImpl(continuation) 
    var result: Any? = null
    var label = 0
    override fun invokeSuspend(`$result`: Any?): Any? 
        this.result = `$result`;
        return myFunction(this);
    

为了提高函数可读性,我前面将其表示为一个名为 MyFunctionContinuation 的类。我还决定通过内联 ContinuationImpl 主体来隐藏继承。为了简化,我跳过了许多优化和机能化,只保留必要的代码部分。

在 JVM 中,类型参数在编译期间会被删除,例如 Continuation<Unit>Continuation<String> 最后将是 Continuation。因为我们在这里展示的所有内容都是用 Kotlin 表示的 JVM 字节码,所以你跟不用担心这些类型参数的问题。

下面的代码就是简化了完全的 myFunction 在底层的样子:

fun myFunction(continuation: Continuation<Unit>): Any 
    val continuation = continuation as? MyFunctionContinuation
        ?: MyFunctionContinuation(continuation)
    if (continuation.label == 0) 
        println("Before")
        continuation.label = 1
        
        if (delay(1000, continuation) == COROUTINE_SUSPENDED)
            return COROUTINE_SUSPENDED
        
    
    if (continuation.label == 1) 
        println("After")
        return Unit
    
    error("Impossible") 

    
class MyFunctionContinuation(
    val completion: Continuation<Unit>
) : Continuation<Unit> 
    override val context: CoroutineContext
        get() = completion.context

    var label = 0
    var result: Result<Any>? = null
    override fun resumeWith(result: Result<Unit>) 
        this.result = result
        val res = try 
            val r = myFunction(this)
            if (r == COROUTINE_SUSPENDED) return
            Result.success(r as Unit) 
         catch (e: Throwable) 
            Result.failure(e)
        
        completion.resumeWith(res)
     

如果你想自己分析挂起函数的底层运作,在Intellij IDEA 中打开 Tool -> Kotlin -> Show Koltin bytecode, 然后点击 “Decompile” 按钮,结果你将看到这段代码被反编译成 Java(这些代码或多或少使用 Java 编写的样子)。


带有状态的函数

如果函数有一些状态(比如局部变量或参数)需要在挂起后恢复。那么这个状态需要在函数的 continuation 中持有,让我们思考下面这个函数:

suspend fun myFunction() 
    println("Before")
    var counter = 0 // myFunction 的一个状态(局部变量)
    delay(1000) // suspending
    counter++
    println("Counter: $counter")
    println("After") 

这里 counter 在两种状态下都是被需要的(对于等于0和等于1的 label ),因此需要在 continuation 中保留它。它会在挂起之前被存储起来,函数开始后则会恢复这些属性。下面是(简化后的)函数在底层的样子:

fun myFunction(continuation: Continuation<Unit>): Any 
    val continuation = continuation as? MyFunctionContinuation
        ?: MyFunctionContinuation(continuation)
    var counter = continuation.counter
    
    if (continuation.label == 0) 
        println("Before")
        counter = 0
        continuation.counter = counter
        continuation.label = 1
        
        if (delay(1000, continuation) == COROUTINE_SUSPENDED)
            return COROUTINE_SUSPENDED
        
    
    
    if (continuation.label == 1) 
        counter = (counter as Int) + 1
        println("Counter: $counter")
        println("After")
        return Unit
    
    error("Impossible") 


class MyFunctionContinuation(
    val completion: Continuation<Unit>
) : Continuation<Unit> 
    
    override val context: CoroutineContext
        get() = completion.context
    
    var result: Result<Unit>? = null
    var label = 0

    var counter = 0  // 这里将状态存储起来了
    
    override fun resumeWith(result: Result<Unit>) 
        this.result = result
        val res = try 
            val r = myFunction(this)
            if (r == COROUTINE_SUSPENDED) return
            Result.success(r as Unit)
         catch (e: Throwable) 
            Result.failure(e)
        
        completion.resumeWith(res)
    

带值恢复的函数

如果我们期望从挂起函数获取一些数据,情况会略有不同,让我们来分析下面函数:

suspend fun printUser(token: String) 
    println("Before")
    val userId = getUserId(token) // suspending
    println("Got userId: $userId")
    val userName = getUserName(userId, token) // suspending
    println(User(userId, userName))
    println("After") 

这里有两个挂起函数: getUserIdgetUserName,我们还添加了一个参数 token,我们的挂起函数还返回了一些值。下面这些信息都需要存储在 Continuation 中:

  • token, 因为在状态 0 和 状态 1 都使用了它
  • userId, 因为在状态 1 和 状态 2 都使用了它
  • Result 类型的 result,表示函数如何恢复

如果函数恢复了一个值,结果将是 Result.Success(vaule),在这种情况下,我们可以获得并使用这个值。 如果在异常情况下,结果将是 Result.Failure(exception)。在这种情况下,将会抛出此异常。

fun printUser(
    token: String,
    continuation: Continuation<*>
): Any 
    val continuation = continuation as? PrintUserContinuation
        ?: PrintUserContinuation(
            continuation as Continuation<Unit>,token)
        
    var result: Result<Any>? = continuation.result
    var userId: String? = continuation.userId
    val userName: String
    
    if (continuation.label == 0) 
        println("Before")
        continuation.label = 1
        val res = getUserId(token, continuation)
        if (res == COROUTINE_SUSPENDED) 
            return COROUTINE_SUSPENDED
        
        result = Result.success(res)
    
    
    if (continuation.label == 1) 
        userId = result!!.getOrThrow() as String
        println("Got userId: $userId")
        continuation.label = 2
        continuation.userId = userId
        val res = getUserName(userId, continuation)
        if (res == COROUTINE_SUSPENDED) 
            return COROUTINE_SUSPENDED
        
        result = Result.success(res)
    
    
    if (continuation.label == 2) 
        userName = result!!.getOrThrow() as String
        println(User(userId as String, userName))
        println("After")
        return Unit
    
    error("Impossible") 


class PrintUserContinuation(
    val completion: Continuation<Unit>,
    val token: String
) : Continuation<String> 
    override val context: CoroutineContext
        get() = completion.context
    var label = 0
    var result: Result<Any>? = null
    var userId: String? = null
    override fun resumeWith(result: Result<String>) 
        this.result = result
        val res = try 
            val r = printUser(token, this)
            if (r == COROUTINE_SUSPENDED) return
            Result.success(r as Unit)
         catch (e: Throwable) 
            Result.failure(e)
        
    completion.resumeWith(res)
    

调用栈

当函数 a 调用函数 b 时,虚拟机需要将函数 a 的状态存储在某个地方,同样还有函数 b 完成时执行返回的地址。所有这些都存储在一个成为调用栈的结构中。 问题是,当我们挂起时,我们释放了一个线程,我们因此清空了调用栈。所以,当我们恢复时,调用栈是没有用的。相反, continuation 充当了调用栈。每个 continuation 都会在我们挂起时(作为 label)保留函数的局部变量和参数,以及对调用该函数的函数的 continuation 的引用。一个 continuation 引用另一个 continuation,后者又引用另一个 continutaion,从此往复。 因此,我们的 continuation 就像一个巨大的洋葱:它保留了常规保存在调用栈中的所有内容。看看下面的例子:

suspend fun a() 
    val user = readUser()
    b()
    b()
    b()
    println(user)


suspend fun b() 
    for (i in 1..10) 
        c(i)
    


suspend fun c(i: Int) 
    delay(i * 100L)
    println("Tick") 

i 等于4时,一个 continuation 引用可以表示为:

CContinuation(
    i = 4,
    label = 1,
    completion = BContinuation(
        i = 4,
        label = 1,
        completion = AContinuation(
            label = 2,
            user = User@1234,
            completion = ...
        )
    ) 
)

看看上面的表示,“Tick”会被打印了多少次(假设 readuUser 不是一个挂起函数)? (答案是13次)

当一个 continuation 被恢复时,每个 continuation 首先调用它的函数,完成此操作后,continutaion 将恢复调用该函数的函数的 continutaion。这个 continutaion 再调用它的函数,这个过程循环往复,直到到达调用链的顶部。

override fun resumeWith(result: Result<String>) 
    this.result = result
    val res = try 
        val r = printUser(token, this)
        if (r == COROUTINE_SUSPENDED) return
        Result.success(r as Unit)
    
    catch (e: Throwable) 
        Result.failure(e)
    
    completion.resumeWith(res)

例如,思考这样一个情况: 函数 a 调用函数 b,函数 b 调用挂起函数 c。在恢复的过程中, c 的continutaion 首先恢复 c 函数,一旦这个函数完成, c 的 continutaion 将会调用 b 函数的 continutaion。完成后, b 的 continutaion 调用 a 的 continutaion,后者调用 a 的函数。

整个过程可以用下面这张图片来展示:

与异常看起来相似:未捕获的异常在 resumeWith 中被捕获,然后用 Result.failure(e) 包装,然后再用这个结果恢复调用我们函数的函数。

我希望这些都能让你们对挂起时发生的事情有一些了解。状态需要存储在 continutaion 中,需要支持挂起机制,恢复时,需要从 continutaion 恢复状态,并使用结果或抛出异常。

实际代码

continutaion 和挂起函数编译后的实际代码更为复杂,因为它包含了一些优化和额外的机制,比如:

  • 构建了更好的异常堆栈跟踪
  • 增加了协程挂起拦截的特性(我们将在后面讨论这个特性)
  • 不同级别的优化(如删除未使用的变量,或尾调用优化)

下面是来自 Kotlin 1.5.30 版本的 BaseContinuationImpl 的一部分,它展示了实际 resumeWith 代码(还有其他方法和一些跳过的注释):

internal abstract class BaseContinuationImpl(
    val completion: Continuation<Any?>?
) : Continuation<Any?>, CoroutineStackFrame, Serializable 
    // This implementation is final. This fact is used to
    // unroll resumeWith recursion.
    final override fun resumeWith(result: Result<Any?>) 
        // This loop unrolls recursion in
        // current.resumeWith(param) to make saner and
        // shorter stack traces on resume
        var current = this
        var param = result
        while (true) 
            // Invoke "resume" debug probe on every resumed
            // continuation, so that a debugging library
            // infrastructure can precisely track what part
            // of suspended call stack was already resumed
            probeCoroutineResumed(current)
            with(current) 
                val completion = completion!! // fail fast
                // when trying to resume continuation
                // without completion
                val outcome: Result<Any?> =
                try 
                    val outcome = invokeSuspend(param)
                    if (outcome === COROUTINE_SUSPENDED)
                    return
                    Result.success(outcome)
                 catch (exception: Throwable) 
                    Result.failure(exception)
                
                releaseIntercepted()
                // this state machine instance is terminating
                if (completion is BaseContinuationImpl) 
                    深潜Koltin协程:底层中的协程

Unity的协程详解

2020-08-20:GO语言中的协程与Python中的协程的区别?

关于Python的协程问题总结

基于python生成器封装的协程类

协程及Python中的协程