深入理解Kotlin协程协程的创建启动挂起函数理论篇

Posted 川峰

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了深入理解Kotlin协程协程的创建启动挂起函数理论篇相关的知识,希望对你有一定的参考价值。

kotlin实现恢复挂起点是通过一个接口类Continuation(英文翻译过来叫"继续、延续、续体")来实现的。 Kotlin 续体有两个接口: Continuation  和 CancellableContinuation, 顾名思义 CancellableContinuation 是一个可以取消的 Continuation
public interface Continuation<in T> 
    public val context: CoroutineContext
    public fun resumeWith(result: Result<T>)

Continuation 成员

  • val contextCoroutineContext: 当前协程的 CoroutineContext 上下文
  • fun resumeWith(result: Result<T>): 传递 result 恢复协程
CancellableContinuation 成员
  • isActive, isCompleted, isCancelled: 表示当前 Continuation 的状态
  • fun cancel(cause: Throwable? = null): 可选通过一个异常 cause 来取消当前 Continuation 的执行
可以将 Continuation 看成是在挂起点恢复后需要执行的代码封装(状态机实现),比如说对如下逻辑:
suspend fun request() = suspendCoroutine<Response> 
    val response = doRequest()
    it.resume(response)


fun test() = runBlocking 
    val response = request()
    handle(response)

用下面的伪代码简单描述 Continuation 的工作:

interface Continuation<T> 
    fun resume(t: T)


fun request(continuation: Continuation<Response>) 
    val response = doRequest()
    continuation.resume(response)


fun test() 
    request(object :Continuation<Response>
        override fun resume(response: Response) 
            handle(response)
        
    )
对于 suspend 关键词修饰的挂起函数,编译器会为其增加一个 Continuation 续体类型的参数(相当于 CPS 中的回调),可以通过这个 Continuation 续体对象的 resume 方法返回结果值来恢复协程的执行。

协程的创建

在kotlin当中创建一个简单协程不是什么难事 标准库中提供了一个 createCoroutinue 函数,我们可以通过它来创建协程,不过这个协程并不会立即执行。 我们先来看看它的声明:     fun <T> (suspend () -> T).createCoroutine(completion: Continuation<T>): Continuation<Unit> 其中 suspend () -> T 是 createCoroutinue 函数的 Receiver
  • Receiver 是一个被 suspend 修饰的挂起函数,这也是协程的执行体,我们不妨称它为协程体。
  • 参数 completion 会在协程执行完成后调用,实际上就是协程的完成回调
  • 返回值是一个 Continuation 对象,由于现在协程仅仅被创建处理,因此需要通过这个值在之后触发协程的启动。

协程的启动

              我们已经知道如何创建协程,那么协程要如何运行呢?调用 continuation.resume(Unit) 之后,协程体会立即开始执行。                                               通过阅读 createCoroutine 的源码或者直接打断点调试,我们可以得知 continuation 是 SafeContinuation 的实例,不过可不要被它安全的外表骗了,它其实只是一个“马甲”。它有一个名为 delegate 的属性,这个属性才是 Continuation 的本体。            如果大家对 Java 字节码中的匿名内部类的命名方式比较熟悉,就会猜到这其实指代了某一个匿名内部类。那么新的问题产生了,哪儿来的匿名内部类?                      答案也很简单,就是我们的协程体,那个用以创建协程的 suspend lambda 表达式。编译器在对它编译之后,生成了一个匿名内部类,这个类继承自 SuspendLambda 类,而这个类又是 Continuation 接口的实现类。                           最后一个令人疑惑的点是, suspend lambda 表达式是如何编译的?一个函数如何对应一个类呢?这里其实不难理解,SuspendLambda 有一个抽象函数 invokeSuspend (这个函数在它的父类 BaseContinuationImpl 中声明),编译生成的匿名内部类中这个函数的实现就是我们的协程体。                       SuspendLambda继承关系: - Continuation: 续体,恢复协程的执行     - BaseContinuationImpl: 实现 resumeWith(Result) 方法,控制状态机的执行,定义了 invokeSuspend 抽象方法         - ContinuationImpl: 增加 intercepted 拦截器,实现线程调度等             - SuspendLambda: 封装协程体代码块                 - 协程体代码块生成的子类: 实现 invokeSuspend 方法,其内实现状态机流转逻辑               请注意,SafeContinuation 内部包含的对象就是编译器生成的匿名内部类,这个匿名内部类同时又是 SuspendLambda 的子类。            这样看来就非常清晰了,创建协程返回的 Continuation 实例就是套了几层马甲的协程体,因而调用它的 resume 就可以触发协程体的执行。                一般来讲,我们创建协程后就会立即让它开始执行,因此标准库也提供了一个一步到位的 API —— startCoroutine。它与 createCoroutine 除了返回值类型不同之外,剩下的完全一致。 我们已经知道,作为参数传入的 completion 就如同回调一样,协程体的返回值会作为 resumeWith 的参数传入。          协程体的Receiver          与协程的创建和启动相关的 API 一共有两组,除了前面的一组以外,还有一组:

仔细对比发现,这两组API的差异仅仅在于协程体自身的类型,这一组 API的协程体多了一个 Receiver 类型 R。这个 R 可以为协程体提供一个作用域,在协程体内我们可以直接使用作用域内提供的函数或状态等。

Kotlin 没有提供直接带有 Receiver 的 Lambda 表达式的语法,为了方便使用带有 Receiver 的协程 API,我们可以封装一个用以启动协程的函数 launchCoroutine,如代码清单3-2所示。

launchCoroutine 的第二个参数的 Receiver 类型实际上是编译器帮我们推导出来的,正好解决了无法直接声明带有 Receiver 的 Lambda 表达式的问题。

由于添加了作用域 ProducerScope 作为 Receiver,示例中我们可以在协程体中直接调用 produce 函数。delay 函数是我们在 ProducerScope 外部定义的挂起函数,在协程体内也可以自由调用。(类似launchCoroutine这样用以简化协程创建和启动的函数,通常被称为协程的构造器,它们的作用就是构造协程,但不要与类的构造器混淆)

作用域可以用来提供函数支持,自然也可以用来增加限制。如果我们为 Receiver 对应的类型增加一个 RestrictsSuspension 注解,那么在它的作用下,协程体内就无法调用外部的挂起函数了,如代码清单 3-4 所示。

  这里在 RestrictsSuspension 注解的作用下,协程体内部无法调用外部的挂起函数delay,这个特性对于不少在特定场景下创建的协程体有非常大的帮助,可以避免无效甚至危险的挂起函数的调用。标准库中的序列生成器(Sequence Builder)就使用了这个注解。

挂起函数

我们已经知道使用 suspend 关键字修饰的函数叫作挂起函数挂起函数只能在协程体内或 其他挂起函数内调用。这样一来,整个 Kotlin 语言体系内的函数就分为两派:普通函数挂起函数。其中挂起函数可以调用任何函数,普通函数只能调用普通函数

 通过以上两个挂起函数,我们发现挂起函数既可以像普通函数一样同步返回(如suspendFunc01) ,也可以处理异步逻辑(如 suspendFunc02)。 既然是函数,它们也有自己的函数类型,依次为 suspend (Int) -> Unit 和 suspend (String, String) -> Int。

在suspendFunc02 的定义中,我们再次用到了 suspendCoroutine<T> 获取当前所在协程体的 Continuation<T> 的实例作为参数将挂起函数当成异步函数来处理,在代码清单 3-8 的①处新创建线程执行 Continutation.resumeWith 操作,因此协程调用 suspendFunc02 无法同步执行,会进入挂起状态,直到结果返回。

所谓协程的挂起其实就是程序执行流程发生异步调用时,当前调用流程的执行状态进入等待状态。请注意,挂起函数不一定真的会挂起,只是提供了挂起的条件。那什么情况下才会真正挂起呢?

挂起点

在前面的 suspendFunc02 的定义中我们发现, 一个函数想要让自己挂起,所需要的无非就是一个 Continuation 实例,我们也确实可以通过 suspendCoroutine 函数获取到它,但是这个 Continuation 是从哪儿来的呢?            回想下协程的创建和运行过程,我们的协程体本身就是一个 Continuation 实例,正因如此挂起函数才能在协程体内运行。在协程内部挂起函数的调用处被称为挂起点,挂起点如果出现异步调用,那么当前协程就被挂起,直到对应的 Continuation 的 resume 函数被调用才会恢复执行。                  我们已经知道,通过 suspendCoroutine 函数获得的 Continuation 是一个 SafeContinuation的实例,与创建协程时得到的用来启动协程的 Continuation 实例没有本质上的差别。SafeContinuation 类的作用也非常简单,它可以确保只有发生异步调用时才会挂起,例如代码清单 3-8 所示的情况虽然也有 resume 函数的调用,但协程并不会真正挂起。

 异步调用是否发生,取决于 resume 函数与对应的挂起函数的调用是否在相同的调用栈上,切换函数调用栈的方法可以是切换到其他线程上执行,也可以是不切换线程但在当前函数返回之后的某一个时刻再执行。前者比较容易理解,后者其实通常就是先将 Continuation 的实例保 下来,在后续合适的时机再调用 ,存在事件循环的平台很容易做到这一点,例如 android 平台的主线程 Looper 循环。

CPS变换

Continuation Passing Style(续体传递风格): 约定一种编程规范,函数不直接返回结果值,而是在函数最后一个参数位置传入一个 callback 函数参数,并在函数执行完成时通过 callback 来处理结果。回调函数 callback 被称为续体(Continuation),它决定了程序接下来的行为,整个程序的逻辑通过一个个 Continuation 拼接在一起。             CPS 变换 (Continuation-Passing-Style Transformation),就是通过传递 Continuation 来控制异步调用流程的,并解决回调地狱和栈空间占用的问题。
  • Kotlin suspend 挂起函数写法与普通函数一样,但编译器会对 suspend 关键字的函数做 CPS 变换,这就是咱们常说的用看起来同步的方式写出异步的代码,消除回调地狱(callback hell)。
  • 另外为了避免栈空间过大的问题,  Kotlin 编译器并没有把代码转换成函数回调的形式,而是利用状态机模型。每两个挂起点之间可以看为一个状态,每次进入状态机时都有一个当前的状态,然后执行该状态对应的代码;如果程序执行完毕则返回结果值,否则返回一个特殊值,表示从这个状态退出并等待下次进入。相当于创建了一个可复用的回调,每次都使用这同一个回调,根据不同状态来执行不同的代码。
续体是一个较为抽象的概念,简单来说它包装了协程在挂起之后应该继续执行的代码;在编译的过程中,一个完整的协程被分割切块成一个又一个的续体,每个挂起函数被视作一个挂起点,挂起函数执行完毕后再从挂起点恢复后面续体的执行。

我们来想象一下,程序被挂起时,最关键的是要做什么?是保存挂起点。线程也类似,它被中断时,中断点就是被保存在调用栈中的。             Kotlin 协程挂起时就将挂起点的信息保存到了 Continuation 对象中。 Continuation 带了协程继续执行所需要的上下文,恢复执行的时候只需要执行它的恢复调用并且把需要的参数或者异常传入即可。作为一个普通的对象, Continuation 占用内存非常小,这也是无栈协程能够流行的一个重要原因。         我们在前面讲到,挂起函数如果需要挂起,则需要通过 suspendCoroutine 来获取 Continuation 实例,我们已经知道它是协程体,但是这个实例是怎么传进来的呢?            我们仍以代码清单 3-8 为例, notSuspend 函数看起来没有接收任何参数, Kotlin 语法似乎不会告诉我们任何真相了,想要创根问底有两种方法 种就是看字节码或者使用 Java 代码直接调用它,另一种就是使用 Kotlin 反射。这两个行为几乎都可以认为是在违反 Kotlin 语法了,仅做研究学习使用,生产环境中千万不要这么写。 我们先来看下如何用 Java 代码调用挂起函数,如代码清单 3-9 所示。

我们发现, suspend () -> Int 型的函数 notSuspend 在 Java 语言看来实际上是 (Continuation<Integer>) -> Object 型, 这正好与我们写的异步回调的方法类似,传一个回调进去等待结果返回就好了。

但是,这里为什么出现了返回值 Object ?通常我们回调方法是不会有返回值的,这里返回值 Object 有两种情况:
  • 挂起函数同步返回, 作为参数传入的 Continuation 的 resumeWith 不会被调用,数的实际返回值就是它作为挂起函数的返回值。notSuspend 尽管看起来似乎调用resumeWith, 不过调用对象是 SafeContinuation, 这点我们在前面已经多次提到,因此它的实现属于同步返回。
  • 挂起函数挂起执行异步逻辑。此时函数的实际返回值是一个挂起标志,通过这个标志外部协程就可以知道该函数需要挂起等到异步逻辑执行。在 Kotlin 中这个标志是个常量,定义在 Intrinsics.kt 当中:

 现在大家知道了原来挂起函数就是普通函数的参数中多了一个 Continuation 实例,难怪挂起函数总是可以调用普通函数,普通函数却不可以调用挂起函数。

现在请大家仔细想想,为什么 Kotlin 语法要求挂起函数一定要运行在协程体内或者其他挂起函数中呢?答案就是,任何一个协程体或者挂起函数中都有一个隐含 Continuation 实例,编译器能够对这个实例进行正确传递,并将这个细节隐藏在协程的背后,让我们的异步代码看起来像同步代码一样

 

添加了suspend关键字的函数,kotlin最终生成下面的方法:

可以看到,suspend函数实际上需要一个continuation参数

如果挂起函数没有真正被挂起(没有发生线程切换)返回值返回的就是实际参数类型,否则返回的是一个挂起标记。

Kotlin中可挂起的main函数实现原理:

suspend fun main() 
    ....
编译器会为suspend的main函数生成下面等价的代码:
fun main(continuation: Continuation<Unit>): Any? 
    return println("Hello")
实际上,源码中是在main()函数中调用了runSuspend方法,runSuspend当中创建并启动了一个协程体,只不过我们看不到这个代码。
fun main() 
    runSuspend(::main1 as suspend () -> Unit)

可以通过查看kotlin字节码反编译的Java代码来理解: 

/**
* Wrapper for `suspend fun main` and `@Test suspend fun testXXX` functions.
*/
@SinceKotlin("1.3")
internal fun runSuspend(block: suspend () -> Unit) 
    val run = RunSuspend()
    block.startCoroutine(run)
    run.await()


private class RunSuspend : Continuation<Unit> 
    override val context: CoroutineContext
        get() = EmptyCoroutineContext

    //-Xallow-result-return-type
    var result: Result<Unit>? = null

    override fun resumeWith(result: Result<Unit>) = synchronized(this) 
        this.result = result
        @Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN") (this as Object).notifyAll()
    

    fun await() = synchronized(this) 
        while (true) 
            when (val result = this.result) 
                null -> @Suppress("PLATFORM_CLASS_MAPPED_TO_KOTLIN") (this as Object).wait()
                else -> 
                    result.getOrThrow() // throw up failure
                    return
                
            
        
    
 可以看到是配合Object.wait()、Object.notifyAll()来实现的。 参考:    《深入理解Kotlin协程》- 2020年-机械工业出版社-霍丙乾 Kotlin协程之再次读懂协程工作原理https://juejin.cn/post/7137905800504148004

以上是关于深入理解Kotlin协程协程的创建启动挂起函数理论篇的主要内容,如果未能解决你的问题,请参考以下文章

Kotlin 协程协程的挂起和恢复 ② ( 协程挂起 和 线程阻塞 对比 )

Kotlin 协程协程的挂起和恢复 ② ( 协程挂起 和 线程阻塞 对比 )

Kotlin 协程协程的挂起和恢复 ① ( 协程的挂起和恢复概念 | 协程的 suspend 挂起函数 )

Kotlin 协程协程的挂起和恢复 ① ( 协程的挂起和恢复概念 | 协程的 suspend 挂起函数 )

深入理解Kotlin协程协程作用域启动模式调度器异常和取消使用篇

深入理解Kotlin协程协程的上下文 CoroutineContext