深入理解Kotlin协程CoroutineScope.launch源码追踪扒皮
Posted 川峰
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了深入理解Kotlin协程CoroutineScope.launch源码追踪扒皮相关的知识,希望对你有一定的参考价值。
lifecycleScope、viewModelScope、GlobalScope、MainScope的上下文
协程最重要的就是协程上下文对象,因为通过上下文可以获取到协程相关的任何东西(Job、Dispatcher、Interceptor、Name、ExceptionHandler),所以有必要了解清楚常用的协程作用域对象中coroutineContext的组成。
lifecycleScope 的协程上下文是 SupervisorJob + Dispatchers.Main.immediate
viewModelScope 的协程上下文是 SupervisorJob + Dispatchers.Main.immediate,和lifecycleScope的一样
GlobalScope 的协程上下文是 EmptyCoroutineContex
MainScope 的协程上下文是 SupervisorJob + Dispatchers.Main
总结就是lifecycleScope、viewModelScope、MainScope这三种协程作用域的上下文组成是 SupervisorJob 加主线程调度器, 而 GlobalScope协程作用域的上下文则是一个空的上下文
CoroutineScope.cancel()
CoroutineScope 有一个扩展函数 cancel() 可以用来取消内部启动的协程,比如Lifecycle 会在生命周期状态为 DESTROYED 时调用 cancel() 函数取消掉该作用域启动的协程,ViewModel 则会在 clear() 方法中调用 cancel() 函数,对于 Activity 和 Fragment 的 ViewModel ,clear() 方法也是在 destroy 回调里触发的。
来看一下 CoroutineScope.cancel() 方法的实现:
可以看到它是从协程上下文中获取Job对象,然后调用job对象的cancel方法进行取消的,而上面分析的四种Scope中,只有GlobalScope的协程上下文是空的,因此无法从中查询到Job对象,并且它的协程上下文只能获取,也不能通过+运算符设值(val属性),所以GlobalScope无法被cancel,这就是为什么在 android 中不推荐使用GlobalScope这个作用域来启动协程的原因。
launch方法解析
因为launch是CoroutineScope的一个扩展方法,因此以上四种作用的launch都是同一个方法,如下:
其中 newCoroutineContext(context) 返回的是 scope作用域上下文 + context参数上下文 + Dispatchers.Default(如果未指定才添加)
上面 coroutine.start 的调用涉及到运算符重载,这里会比较绕,我们只需要知道这里实际上最终会调的是 CoroutineStart.invoke() 方法,在这个方法中根据当前的启动模式去分别执行不同方法,默认的启动模式是DEFAULT, 因此这里走第一个分支。
同时注意到,DEFAULT模式下,这里startCoroutineCancellable方法的receiver和completion参数都是StandaloneCoroutine对象。 在协程体的逻辑执行完后会调用到 completion 的 resume 方法恢复后面的续体代码执行(CPS)。
StandaloneCoroutine 是 AbstractCoroutine的实现类( AbstractCoroutine中会调用initParentJob方法,与父Job建立关联,当调用cancel方法或者子Job有异常时,可以将取消或者异常事件往上传播), 重写了父类的 handleJobException() 方法,改为调用自 CoroutineExceptionHandler 中的 handleCoroutineException() 方法。
startCoroutineCancellable方法的实现:
其中createCoroutineUnintercepted会首先创建一个SuspendLambda子类的实例对象,接着调用拦截器进行线程调度,最终执行协程体逻辑。
SuspendLambda
createCoroutineUnintercepted 方法的实现(在IntrinsicsJvm.kt中):
这里判断如果this当前对象是 BaseContinuatuionImpl 就调用它的create方法,而 BaseContinuatuionImpl 的最终实现类就是 SuspendLambda 类的子类对象,SuspendLambda 是一个抽象类。继承关系如下:
- Continuation: 续体,恢复协程的执行
- BaseContinuationImpl: 实现 resumeWith(Result) 方法,控制状态机的执行,定义 invokeSuspend 抽象方法
- ContinuationImpl: 增加 intercepted 拦截器,实现线程调度等
- SuspendLambda: 封装协程体代码块
- 协程体代码块生成的子类: 实现 invokeSuspend 方法,其内实现状态机流转逻辑
当我们写 scrop.launch 代码时,编译器会生成如下代码:
可以看到编译器将Kotlin代码中的Lambda表达式变成了Java中的Function2对象,而它对应的具体类是SuspendLambda。因此前面createCoroutineUnintercepted 方法中的create就是调用的上面的create方法,在其中就是创建了一个SuspendLambda的匿名内部类对象并返回。
Continuation.intercepted()
因此 Continuation.intercepted() 最终就是从context上下文中查询到拦截器,然后调用拦截器的方法返回一个DispatchedContinuation对象。该对象由CoroutineDispatcher和Continuation两部分组成。Dispatcher就是线程调度器,即开始提到到的几种lifecycleScope、viewModelScope和MainScope的上下文中配置的默认调度器都是主线程调度器。
再回到最开始分析的 CoroutineStart 中 DEFAULT 分支调用的 startCoroutineCancellable 方法中,最后一步就是调用resumeCancellableWith方法:
DispatchedContinuation.resumeCancellableWith
resumeCancellableWith方法的实现:
这里判断如果需要进行调度,则执行调度器的dispatch()方法,将当前对象分发到调度器上去执行,否则就直接执行resumeUndispatchedWith方法。
在这个方法里面,就是调用了continuation.resumeWith()方法。前面分析了continuation的真正实现者是通过编译器生成的Function2类对象的create方法创建的SuspendLambda的子类的匿名对象,因此这里的continuation.resumeWith()方法就是调用生成的SuspendLambda子类匿名对象的resumeWith()。它被定义在BaseContinuationImpl中。
BaseContinuationImpl.resumeWith()
resumeWith() 方法中会在一个 while 循环中调用前面编译器生成的SuspendLambda的匿名内部类对象的 invokeSuspend() 方法:
如果 invokeSuspend() 方法里又调用了其他的挂起函数,那 invokeSuspend() 方法就会返回一个挂起标志 COROUTINE_SUSPEND,表示当前遇到了挂起函数,等待挂起函数执行完毕再继续。这时就会直接退出 while 循环,否则就把 invokeSuspend() 方法输出的结果封装到 Result 中并返回。
拿到结果后,就会调用 releaseIntercepted() 方法让 ContinuationInterceptor释放拦截到的 Continuation 。
释放 Continuation 后,就会判断 completion 是否为 BaseContinuationImpl ,如果是的话就把当前要调用的 invokeSuspend() 方法的对象改为 BaseContinuationImpl 类型的 completion ,completion 的类型也是 Continuation, 然后走while循环再次调用 invokeSuspend() 方法,就是一个迭代版本的递归调用。
如果 completion 不是 BaseContinuationImpl ,就调用 completion 的 resumeWith() 方法 ,比如使用 runBlocking() 方法启动协程时,第一个任务的 completion 就是 BlockingCorotuine ,因为 BlockingCoroutine 的父类是 AbstractCoroutine ,而 AbstractCoroutine 实现了 Continuation 接口。还有前面分析中有提到的CoroutineStart类中DEFAULT模式下分支执行startCoroutineCancellable方法的completion参数,它是StandaloneCoroutine对象,也是AbstractCoroutine的子类。
AbstractCoroutine.resumeWith()
如果 makeCompletingOnce方法返回的是等待子协程(COMPLETING_WAITING_CHILDREN)标志就返回,否则调用 afterResume() 方法,afterResume() 方法默认会调用 afterCompletion() 方法,它是在父类Jobsupported中定义默认是空实现,也就是说前面分析的StandaloneCoroutine对象中它什么也没做,但是有些类会实现它,比如 BlockingCoroutine 的 afterCompletion() 方法就会在当任务恢复执行的线程与调用 runBlocking 的线程不是同一个线程时,唤醒调用 runBlocking 的线程(blockedThread)。
总结:
-
协程挂起与恢复的两个关键方法是: invokeSuspend() 和 resumeWith(Result)。
-
通过 Scrope.launch 启动的协程代码块会被编译器封装成 SuspendLambda 子类实例对象,实现invokeSuspend()方法
-
suspend 挂起函数与普通函数的唯一区别就是,编译器会对 suspend 关键字修饰的函数做 CPS 变换;但是 Kotlin 编译器并没有把代码转换成函数回调的形式,而是利用状态机模型(消除回调地狱, 解决栈空间占用问题)。
invokeSuspend() 方法就是对协程代码块的封装,内部加入状态机机制将整个逻辑分为多块,分隔点就是每个挂起点。
协程启动时会先调用一次 invokeSuspend() 函数触发协程体的开始执行,后面每当调用到一个挂起函数时,挂起函数会返回 COROUTINE_SUSPENDED 标识,从而 return 停掉 invokeSuspend() 函数的执行,即非阻塞挂起。编译器会为挂起函数自动添加一个 continuation 续体对象参数,表示调用它的那个协程代码块,在该挂起函数执行完成后,就会调用到续体对象的 continuation.resumeWith() 方法来返回结果(或异常),而在 resumeWith() 中又调用了 invokeSuspend() 方法,其内根据状态机的状态来恢复协程的执行。
Kotlin 协程中存在三层包装,每层包装都持有上层包装的引用,用来执行其 resumeWith() 方法做一些处理:
-
第一层包装: launch & async 返回的 Job, Deferred 继承自 AbstractCoroutine, 里面封装了协程的状态,提供了 cancel 等接口;
-
第二层包装: 编译器生成的 SuspendLambda 子类,封装了协程的真正执行逻辑,其继承关系为 SuspendLambda <- ContinuationImpl <- BaseContinuationImpl, 它的 completion 参数就是第一层包装实例;
-
第三层包装: DispatchedContinuation, 封装了线程调度逻辑,它的 continuation 参数就是第二层包装实例。
这三层包装都实现了 Continuation 续体接口,通过代理模式将协程的各层包装组合在一起,每层负责不同的功能。
下图的 resumeWith() 可能表示 resume(), 也可能表示 resumeCancellableWith() 等系列方法:
witchContext()源码追踪
在 Android 中使用 withContext(),一般目的是把任务从主线程调度器切到工作线程调度器(如 Dispatchers.IO)执行,或者是从工作线程调度器切回主线程调度器 Dispatchers.Main 。
withContext() 中的代码块是调用了 suspendCoroutineUninterceptedOrReturn 方法来执行的:
public suspend fun <T> withContext(
context: CoroutineContext,
block: suspend CoroutineScope.() -> T
): T
contract
callsInPlace(block, InvocationKind.EXACTLY_ONCE)
return suspendCoroutineUninterceptedOrReturn sc@ uCont ->
// 拷贝旧的协程上下文计算生成新的协程上下文
val oldContext = uCont.context
// Copy CopyableThreadContextElement if necessary
val newContext = oldContext.newCoroutineContext(context)
// 检查Job是否是已取消状态
newContext.ensureActive()
// 快路径 #1 -- 新的协程上下文跟旧的协程上下文是同一个
if (newContext === oldContext)
// 创建作用域协程
val coroutine = ScopeCoroutine(newContext, uCont)
// 不分发,直接启动执行协程代码
return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
// 快路径 #2 -- 新旧协程上下文中调度器没有改变,直接进行
// `equals` is used by design (see equals implementation is wrapper context like ExecutorCoroutineDispatcher)
if (newContext[ContinuationInterceptor] == oldContext[ContinuationInterceptor])
val coroutine = UndispatchedCoroutine(newContext, uCont)
// 在新的协程上下文中以不分发的模式启动协程
withCoroutineContext(newContext, null)
return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
// 慢路径 -- 新旧协程上下文中的调度器不一样,则使用新的调度器分发,以新的协程上下文创建协程
val coroutine = DispatchedCoroutine(newContext, uCont)
block.startCoroutineCancellable(coroutine, coroutine) // 以可取消的形式执行新协程
coroutine.getResult() // 获取协程执行结果(挂起协程)
在 suspendCoroutineUninterceptedOrReturn() 代码块中,可以获取一个 Continuation 实例 uCont,这个 Continuation 实例就是 withContext() 代码块对应的 SuspendLambda 匿名内部类。
在这个代码块中,首先会尝试把旧的上下文与新的上下文进行结合,所谓的结合,主要就是替换掉上下文中 Key 为 ContinuationInterceptor 的元素,比如这个元素原来是默认协程分发器,把它换成 IO 协程分发器。
如果新的上下文和旧的上下文没有区别的话,就创建一个作用域协程 ScopeCoroutine ,并调用它的 startUndipsatchedOrReturn() 扩展函数启动协程。
如果新的协程上下文和旧的上下文的调度器相同的话,那就创建一个 UndispatchdCoroutine ,并调用它的 startUndipsatchedOrReturn() 扩展函数启动协程。
如果新的协程上下文和旧的协程上下文的调度器不相同的话,那就使用新的协程上下文(包含新的调度器)创建一个 DispatchedCoroutine 新实例,并调用它的 startCoroutineCancellable() 扩展函数,以可取消的形式启动新的协程,这个方法会把父协程挂起,也就是父协程代码块中的代码执行到 withContext() 后就不会继续执行,直到 withContext() 方法返回执行结果后,BaseContinuationImpl 中把值传给 completion ,completion 就是 DispatchedCoroutine ,然后 DispatchedCoroutine 就会把状态切换为 RESUMED 恢复执行。
startCoroutineCancellable()函数在前面分析 launch 方法已经分析过了,launch 与 withContext 在调用这个方法时唯一不同的点就是 completion 不同,在 launch 中 completion 是 StandaloneCoroutine 实例,而在这里 withContext 中 completion 是 DispatchedCoroutine 实例。
DispatchedCoroutine 继承自 ScopeCoroutine
internal open class ScopeCoroutine<in T>(
context: CoroutineContext,
@JvmField val uCont: Continuation<T> // SuspendLambda
) : AbstractCoroutine<T>(context, true, true), CoroutineStackFrame
final override val callerFrame: CoroutineStackFrame? get() = uCont as? CoroutineStackFrame
final override fun getStackTraceElement(): StackTraceElement? = null
// 返回true表示这是一个有作用域的协程
final override val isScopedCoroutine: Boolean get() = true
internal val parent: Job? get() = parentHandle?.parent
// Job完成回调
override fun afterCompletion(state: Any?)
// 以可取消的方式从其他上下文中进行resume
uCont.intercepted().resumeCancellableWith(recoverResult(state, uCont))
// 协程完成回调
override fun afterResume(state: Any?)
// 返回结果给SuspendLambda
uCont.resumeWith(recoverResult(state, uCont))
ScopeCooutine 继承了 AbstractCoroutine ,重写了 JobSupport 的 isScopedCoroutine 常量,重写了 JobSupport 的 afterCompletion() 和 AbstractCoroutine 的 afterResume() 方法。
在 ScopeCoroutine 的 afterCompletion() 方法中,在当前协程的工作完成或取消后,就会调用 uCont 的 intercepted() 方法,让协程分发器把它封装为 DispatchedContinuation ,并把结果传给这个 DispatchedContinuation ,uCont 就是 withContext() 代码块对应的 SuspendLambda 。
当 ScopeCoroutine 的 afterResume() 方法被调用时,它会调用 uCont 成员的 resumeWith() 方法 把结果回调给 SuspendLambda。
// Used by withContext when context dispatcher changes
internal class DispatchedCoroutine<in T>(
context: CoroutineContext,
uCont: Continuation<T> // SuspendLambda
) : ScopeCoroutine<T>(context, uCont)
// 状态机
private val _decision = atomic(UNDECIDED)
...
// Job完成后,恢复协程的运行
override fun afterCompletion(state: Any?)
afterResume(state)
override fun afterResume(state: Any?)
if (tryResume()) return // 尝试恢复协程的运行
// 恢复失败,说明协程处于挂起状态,通过调度器执行协程代码块
uCont.intercepted().resumeCancellableWith(recoverResult(state, uCont))
// 执行获取结果
fun getResult(): Any?
// 尝试挂起协程,挂起成功后返回挂起标志
if (trySuspend()) return COROUTINE_SUSPENDED
// 挂起失败
val state = this.state.unboxState()
if (state is CompletedExceptionally) throw state.cause
@Suppress("UNCHECKED_CAST")
return state as T
DispatchedCoroutine 有一个和 DispatchedContinuation 一样简单的决策状态机,这个状态机有 UNDECIDED、SUSPENDED 和 RESUMED 三种状态。
DispatchedCoroutine 重写了 AbstractCoroutine 的 afterResume() 方法,这个方法会在 resumeWith() 方法被调用的时候调用,协程的 resumeWith() 方法一般是在 SuspendLambda(代码块)执行完后调用的。
DispatchedCoroutine 的 afterResume() 方法首先会尝试把状态迁移到 RESUMED ,如果迁移失败的话,说明协程处于挂起状态,这时就要通过协程分发器再次把 SuspendLambda 封装为任务并进行分发。
在 DispatchedCoroutine 的 getResult() 方法中,会尝试把状态迁移到 SUSPENDED ,迁移成功则返回挂起标志,如果迁移失败的话,说明处于 RESUMED 状态,也就是已经获取到执行结果了,这时就不用再挂起了,直接把状态拆箱并返回。
unboxState() 扩展函数只是简单地判断了一下当前状态是否为未完成状态 IncompleteStateBox ,如果是的话,则返回它的 state 成员,否则返回当前状态。
参考:
以上是关于深入理解Kotlin协程CoroutineScope.launch源码追踪扒皮的主要内容,如果未能解决你的问题,请参考以下文章
深入理解Kotlin协程协程调度器Dispatchers源码追踪扒皮
深入理解Kotlin协程协程的上下文 CoroutineContext