深入理解Kotlin协程CoroutineScope.launch源码追踪扒皮

Posted 川峰

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了深入理解Kotlin协程CoroutineScope.launch源码追踪扒皮相关的知识,希望对你有一定的参考价值。

lifecycleScope、viewModelScope、GlobalScope、MainScope的上下文

协程最重要的就是协程上下文对象,因为通过上下文可以获取到协程相关的任何东西(Job、Dispatcher、Interceptor、Name、ExceptionHandler),所以有必要了解清楚常用的协程作用域对象中coroutineContext的组成。

lifecycleScope 的协程上下文是 SupervisorJob + Dispatchers.Main.immediate

 viewModelScope 的协程上下文是 SupervisorJob + Dispatchers.Main.immediate,lifecycleScope的一样

GlobalScope 的协程上下文是 EmptyCoroutineContex

MainScope 的协程上下文是 SupervisorJob + Dispatchers.Main

总结就是lifecycleScope、viewModelScope、MainScope这三种协程作用域的上下文组成是 SupervisorJob 加主线程调度器, 而 GlobalScope协程作用域的上下文则是一个空的上下文

CoroutineScope.cancel() 

CoroutineScope 有一个扩展函数 cancel() 可以用来取消内部启动的协程,比如Lifecycle 会在生命周期状态为 DESTROYED 时调用 cancel() 函数取消掉该作用域启动的协程,ViewModel 则会在 clear() 方法中调用 cancel() 函数,对于 Activity 和 Fragment 的 ViewModel ,clear() 方法也是在 destroy 回调里触发的。

来看一下 CoroutineScope.cancel() 方法的实现:

可以看到它是从协程上下文中获取Job对象,然后调用job对象的cancel方法进行取消的,而上面分析的四种Scope中,只有GlobalScope的协程上下文是空的,因此无法从中查询到Job对象,并且它的协程上下文只能获取,也不能通过+运算符设值(val属性),所以GlobalScope无法被cancel,这就是为什么在 android 中不推荐使用GlobalScope这个作用域来启动协程的原因。

launch方法解析

因为launch是CoroutineScope的一个扩展方法,因此以上四种作用的launch都是同一个方法,如下:

 其中 newCoroutineContext(context) 返回的是 scope作用域上下文 + context参数上下文 + Dispatchers.Default(如果未指定才添加)

上面 coroutine.start 的调用涉及到运算符重载,这里会比较绕,我们只需要知道这里实际上最终会调的是 CoroutineStart.invoke() 方法,在这个方法中根据当前的启动模式去分别执行不同方法,默认的启动模式是DEFAULT, 因此这里走第一个分支。

同时注意到,DEFAULT模式下,这里startCoroutineCancellable方法的receivercompletion参数都是StandaloneCoroutine对象。 在协程体的逻辑执行完后会调用到 completion 的 resume 方法恢复后面的续体代码执行(CPS)。

StandaloneCoroutine 是 AbstractCoroutine的实现类( AbstractCoroutine中会调用initParentJob方法,与父Job建立关联,当调用cancel方法或者子Job有异常时,可以将取消或者异常事件往上传播), 重写了父类的 handleJobException() 方法,改为调用自 CoroutineExceptionHandler 中的 handleCoroutineException() 方法。

startCoroutineCancellable方法的实现:

 其中createCoroutineUnintercepted会首先创建一个SuspendLambda子类的实例对象,接着调用拦截器进行线程调度,最终执行协程体逻辑。

SuspendLambda

createCoroutineUnintercepted 方法的实现(在IntrinsicsJvm.kt中):

 这里判断如果this当前对象是 BaseContinuatuionImpl 就调用它的create方法,而 BaseContinuatuionImpl 的最终实现类就是 SuspendLambda 类的子类对象,SuspendLambda 是一个抽象类。继承关系如下:

- Continuation: 续体,恢复协程的执行

    - BaseContinuationImpl: 实现 resumeWith(Result) 方法,控制状态机的执行,定义 invokeSuspend 抽象方法

        - ContinuationImpl: 增加 intercepted 拦截器,实现线程调度等

            - SuspendLambda: 封装协程体代码块

                - 协程体代码块生成的子类: 实现 invokeSuspend 方法,其内实现状态机流转逻辑

当我们写 scrop.launch 代码时,编译器会生成如下代码:

 可以看到编译器将Kotlin代码中的Lambda表达式变成了Java中的Function2对象,而它对应的具体类是SuspendLambda。因此前面createCoroutineUnintercepted 方法中的create就是调用的上面的create方法,在其中就是创建了一个SuspendLambda的匿名内部类对象并返回。

Continuation.intercepted()

 

 因此 Continuation.intercepted() 最终就是从context上下文中查询到拦截器,然后调用拦截器的方法返回一个DispatchedContinuation对象。该对象由CoroutineDispatcherContinuation两部分组成。Dispatcher就是线程调度器,即开始提到到的几种lifecycleScope、viewModelScope和MainScope的上下文中配置的默认调度器都是主线程调度器。

再回到最开始分析的 CoroutineStart 中 DEFAULT 分支调用的 startCoroutineCancellable 方法中,最后一步就是调用resumeCancellableWith方法:

DispatchedContinuation.resumeCancellableWith

resumeCancellableWith方法的实现:

 这里判断如果需要进行调度,则执行调度器的dispatch()方法,将当前对象分发到调度器上去执行,否则就直接执行resumeUndispatchedWith方法。

 

 在这个方法里面,就是调用了continuation.resumeWith()方法。前面分析了continuation的真正实现者是通过编译器生成的Function2类对象的create方法创建的SuspendLambda的子类的匿名对象,因此这里的continuation.resumeWith()方法就是调用生成的SuspendLambda子类匿名对象的resumeWith()。它被定义在BaseContinuationImpl中。

BaseContinuationImpl.resumeWith()

 resumeWith() 方法中会在一个 while 循环中调用前面编译器生成的SuspendLambda的匿名内部类对象的 invokeSuspend() 方法:

 如果 invokeSuspend() 方法里又调用了其他的挂起函数,那 invokeSuspend() 方法就会返回一个挂起标志 COROUTINE_SUSPEND,表示当前遇到了挂起函数,等待挂起函数执行完毕再继续。这时就会直接退出 while 循环,否则就把 invokeSuspend() 方法输出的结果封装到 Result 中并返回。 

拿到结果后,就会调用 releaseIntercepted() 方法让 ContinuationInterceptor释放拦截到的 Continuation 。

释放 Continuation 后,就会判断 completion 是否为 BaseContinuationImpl ,如果是的话就把当前要调用的 invokeSuspend() 方法的对象改为 BaseContinuationImpl 类型的 completion ,completion 的类型也是 Continuation, 然后走while循环再次调用 invokeSuspend() 方法,就是一个迭代版本的递归调用。

如果 completion 不是 BaseContinuationImpl ,就调用 completion 的 resumeWith() 方法 ,比如使用 runBlocking() 方法启动协程时,第一个任务的 completion 就是 BlockingCorotuine ,因为 BlockingCoroutine 的父类是 AbstractCoroutine ,而 AbstractCoroutine 实现了 Continuation 接口。还有前面分析中有提到的CoroutineStart类中DEFAULT模式下分支执行startCoroutineCancellable方法的completion参数,它是StandaloneCoroutine对象,也是AbstractCoroutine的子类。

AbstractCoroutine.resumeWith()

 如果 makeCompletingOnce方法返回的是等待子协程(COMPLETING_WAITING_CHILDREN)标志就返回,否则调用 afterResume() 方法,afterResume() 方法默认会调用 afterCompletion() 方法,它是在父类Jobsupported中定义默认是空实现,也就是说前面分析的StandaloneCoroutine对象中它什么也没做,但是有些类会实现它,比如 BlockingCoroutine 的 afterCompletion() 方法就会在当任务恢复执行的线程与调用 runBlocking 的线程不是同一个线程时,唤醒调用 runBlocking 的线程(blockedThread)。

总结:

  • 协程挂起与恢复的两个关键方法是: invokeSuspend() 和 resumeWith(Result)

  • 通过 Scrope.launch  启动的协程代码块会被编译器封装成 SuspendLambda 子类实例对象,实现invokeSuspend()方法

  • suspend 挂起函数与普通函数的唯一区别就是,编译器会对 suspend 关键字修饰的函数做 CPS 变换;但是 Kotlin 编译器并没有把代码转换成函数回调的形式,而是利用状态机模型(消除回调地狱, 解决栈空间占用问题)。 

invokeSuspend() 方法就是对协程代码块的封装,内部加入状态机机制将整个逻辑分为多块,分隔点就是每个挂起点。

协程启动时会先调用一次 invokeSuspend() 函数触发协程体的开始执行,后面每当调用到一个挂起函数时,挂起函数会返回 COROUTINE_SUSPENDED 标识,从而 return 停掉 invokeSuspend() 函数的执行,即非阻塞挂起。编译器会为挂起函数自动添加一个 continuation 续体对象参数,表示调用它的那个协程代码块,在该挂起函数执行完成后,就会调用到续体对象的 continuation.resumeWith() 方法来返回结果(或异常),而在 resumeWith() 中又调用了 invokeSuspend() 方法,其内根据状态机的状态来恢复协程的执行

Kotlin 协程中存在三层包装,每层包装都持有上层包装的引用,用来执行其 resumeWith() 方法做一些处理:

  • 第一层包装: launch & async 返回的 Job, Deferred 继承自 AbstractCoroutine, 里面封装了协程的状态,提供了 cancel 等接口;

  • 第二层包装: 编译器生成的 SuspendLambda 子类,封装了协程的真正执行逻辑,其继承关系为 SuspendLambda <- ContinuationImpl <- BaseContinuationImpl, 它的 completion 参数就是第一层包装实例;

  • 第三层包装: DispatchedContinuation, 封装了线程调度逻辑,它的 continuation 参数就是第二层包装实例。

这三层包装都实现了 Continuation 续体接口,通过代理模式将协程的各层包装组合在一起,每层负责不同的功能。

下图的 resumeWith() 可能表示 resume(), 也可能表示 resumeCancellableWith() 等系列方法:

 

witchContext()源码追踪

在 Android 中使用 withContext(),一般目的是把任务从主线程调度器切到工作线程调度器(如 Dispatchers.IO)执行,或者是从工作线程调度器切回主线程调度器 Dispatchers.Main 。

withContext() 中的代码块是调用了 suspendCoroutineUninterceptedOrReturn 方法来执行的:

public suspend fun <T> withContext(
    context: CoroutineContext,
    block: suspend CoroutineScope.() -> T
): T 
    contract 
        callsInPlace(block, InvocationKind.EXACTLY_ONCE)
    
    return suspendCoroutineUninterceptedOrReturn sc@  uCont ->
        // 拷贝旧的协程上下文计算生成新的协程上下文
        val oldContext = uCont.context
        // Copy CopyableThreadContextElement if necessary
        val newContext = oldContext.newCoroutineContext(context)
        // 检查Job是否是已取消状态
        newContext.ensureActive()
        // 快路径 #1 -- 新的协程上下文跟旧的协程上下文是同一个
        if (newContext === oldContext) 
            // 创建作用域协程
            val coroutine = ScopeCoroutine(newContext, uCont)
            // 不分发,直接启动执行协程代码
            return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
        
        // 快路径 #2 -- 新旧协程上下文中调度器没有改变,直接进行
        // `equals` is used by design (see equals implementation is wrapper context like ExecutorCoroutineDispatcher)
        if (newContext[ContinuationInterceptor] == oldContext[ContinuationInterceptor]) 
            val coroutine = UndispatchedCoroutine(newContext, uCont)
            // 在新的协程上下文中以不分发的模式启动协程
            withCoroutineContext(newContext, null) 
                return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
            
        
        // 慢路径 -- 新旧协程上下文中的调度器不一样,则使用新的调度器分发,以新的协程上下文创建协程
        val coroutine = DispatchedCoroutine(newContext, uCont)
        block.startCoroutineCancellable(coroutine, coroutine) // 以可取消的形式执行新协程
        coroutine.getResult() // 获取协程执行结果(挂起协程)
    

在 suspendCoroutineUninterceptedOrReturn() 代码块中,可以获取一个 Continuation 实例 uCont,这个 Continuation 实例就是 withContext() 代码块对应的 SuspendLambda 匿名内部类。

在这个代码块中,首先会尝试把旧的上下文与新的上下文进行结合,所谓的结合,主要就是替换掉上下文中 Key 为 ContinuationInterceptor 的元素,比如这个元素原来是默认协程分发器,把它换成 IO 协程分发器。

如果新的上下文和旧的上下文没有区别的话,就创建一个作用域协程 ScopeCoroutine ,并调用它的 startUndipsatchedOrReturn() 扩展函数启动协程。

如果新的协程上下文和旧的上下文的调度器相同的话,那就创建一个 UndispatchdCoroutine ,并调用它的 startUndipsatchedOrReturn() 扩展函数启动协程。

如果新的协程上下文和旧的协程上下文的调度器不相同的话,那就使用新的协程上下文(包含新的调度器)创建一个 DispatchedCoroutine 新实例,并调用它的 startCoroutineCancellable() 扩展函数,以可取消的形式启动新的协程,这个方法会把父协程挂起,也就是父协程代码块中的代码执行到 withContext() 后就不会继续执行,直到 withContext() 方法返回执行结果后,BaseContinuationImpl 中把值传给 completion ,completion 就是 DispatchedCoroutine ,然后 DispatchedCoroutine 就会把状态切换为 RESUMED 恢复执行。

startCoroutineCancellable()函数在前面分析 launch 方法已经分析过了,launch 与 withContext 在调用这个方法时唯一不同的点就是 completion 不同,在 launch 中 completion 是 StandaloneCoroutine 实例,而在这里 withContext 中  completion 是 DispatchedCoroutine 实例。

DispatchedCoroutine 继承自 ScopeCoroutine

internal open class ScopeCoroutine<in T>(
    context: CoroutineContext,
    @JvmField val uCont: Continuation<T> // SuspendLambda
) : AbstractCoroutine<T>(context, true, true), CoroutineStackFrame 
    final override val callerFrame: CoroutineStackFrame? get() = uCont as? CoroutineStackFrame
    final override fun getStackTraceElement(): StackTraceElement? = null
    // 返回true表示这是一个有作用域的协程
    final override val isScopedCoroutine: Boolean get() = true
    internal val parent: Job? get() = parentHandle?.parent

    // Job完成回调
    override fun afterCompletion(state: Any?) 
        // 以可取消的方式从其他上下文中进行resume
        uCont.intercepted().resumeCancellableWith(recoverResult(state, uCont))
    

    // 协程完成回调
    override fun afterResume(state: Any?) 
        // 返回结果给SuspendLambda
        uCont.resumeWith(recoverResult(state, uCont))
    

ScopeCooutine 继承了 AbstractCoroutine ,重写了 JobSupport 的 isScopedCoroutine 常量,重写了 JobSupport 的 afterCompletion() 和 AbstractCoroutine 的 afterResume() 方法。

在 ScopeCoroutine 的 afterCompletion() 方法中,在当前协程的工作完成或取消后,就会调用 uCont 的 intercepted() 方法,让协程分发器把它封装为 DispatchedContinuation ,并把结果传给这个 DispatchedContinuation ,uCont 就是 withContext() 代码块对应的 SuspendLambda 。

当 ScopeCoroutine 的 afterResume() 方法被调用时,它会调用 uCont 成员的 resumeWith() 方法 把结果回调给 SuspendLambda。

// Used by withContext when context dispatcher changes
internal class DispatchedCoroutine<in T>(
    context: CoroutineContext,
    uCont: Continuation<T>              // SuspendLambda
) : ScopeCoroutine<T>(context, uCont) 

    // 状态机
    private val _decision = atomic(UNDECIDED)

    ...

    // Job完成后,恢复协程的运行
    override fun afterCompletion(state: Any?) 
        afterResume(state)
    

    override fun afterResume(state: Any?) 
        if (tryResume()) return // 尝试恢复协程的运行
        // 恢复失败,说明协程处于挂起状态,通过调度器执行协程代码块
        uCont.intercepted().resumeCancellableWith(recoverResult(state, uCont))
    

    // 执行获取结果
    fun getResult(): Any? 
        // 尝试挂起协程,挂起成功后返回挂起标志
        if (trySuspend()) return COROUTINE_SUSPENDED
        // 挂起失败
        val state = this.state.unboxState()
        if (state is CompletedExceptionally) throw state.cause
        @Suppress("UNCHECKED_CAST")
        return state as T
    

DispatchedCoroutine 有一个和 DispatchedContinuation 一样简单的决策状态机,这个状态机有 UNDECIDED、SUSPENDED 和 RESUMED 三种状态。

DispatchedCoroutine 重写了 AbstractCoroutine 的 afterResume() 方法,这个方法会在 resumeWith() 方法被调用的时候调用,协程的 resumeWith() 方法一般是在 SuspendLambda(代码块)执行完后调用的。

DispatchedCoroutine 的 afterResume() 方法首先会尝试把状态迁移到 RESUMED ,如果迁移失败的话,说明协程处于挂起状态,这时就要通过协程分发器再次把 SuspendLambda 封装为任务并进行分发。

在 DispatchedCoroutine 的 getResult() 方法中,会尝试把状态迁移到 SUSPENDED ,迁移成功则返回挂起标志,如果迁移失败的话,说明处于 RESUMED 状态,也就是已经获取到执行结果了,这时就不用再挂起了,直接把状态拆箱并返回。

unboxState() 扩展函数只是简单地判断了一下当前状态是否为未完成状态 IncompleteStateBox ,如果是的话,则返回它的 state 成员,否则返回当前状态。

参考:

Kotlin协程之再次读懂协程工作原理

抽丝剥茧聊Kotlin协程之协程启动原理

探索 Kotlin 协程原理

以上是关于深入理解Kotlin协程CoroutineScope.launch源码追踪扒皮的主要内容,如果未能解决你的问题,请参考以下文章

深入理解Kotlin协程协程调度器Dispatchers源码追踪扒皮

Kotlin协程基础概念深入理解

深入理解Kotlin协程协程的上下文 CoroutineContext

深入理解Kotlin协程CoroutineScope.launch源码追踪扒皮

深入理解Kotlin协程使用Job控制协程的生命周期

深入理解Kotlin协程lifecycleScope源码追踪扒皮