kotlin协程硬核解读(6. 协程调度器实现原理)
Posted open-Xu
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了kotlin协程硬核解读(6. 协程调度器实现原理)相关的知识,希望对你有一定的参考价值。
版权声明:本文为openXu原创文章【openXu的博客】,未经博主允许不得以任何形式转载
文章目录
在《kotlin协程硬核解读(4. 协程的创建和启动流程分析)》一文中,我们分析了协程自创建器创建协程对象到开始执行协程代码块中代码的整个过程,文章最后总结了协程有3层包装:
- 第一层是通过协程构建器创建的
AbstractCoroutine
子类类型的协程对象,它的作用是维护了协程的上下文 - 第二层是编译期生成的
SuspendLambda
的子类对象,封装了协程代码块中的代码和执行逻辑 - 第三层是
DispatchedContinuation
,封装了协程的线程调度,也就是决定协程代码块中的代码是在那个线程上执行的
在kotlin协程中,协程的执行涉及到3次线程切换,分别是:
- 切换到指定线程执行协程代码块中的代码
- 当协程代码块调用到异步挂起函数时,切换到指定线程执行挂起函数
- 当异步挂起函数执行完毕,将函数执行结果Result对象切换到协程所在的线程,继续执行协程代码块中剩下的代码
这些线程切换的动作都是通过协程调度器来实现的,本文将对协程调度器做详细讲解。
1. 相关类介绍
1.1 ContinuationInterceptor续体拦截器
在《kotlin协程硬核解读(3. suspend挂起函数&挂起和恢复的实现原理)》一文的2.3 SuspendLambda节我们讲过续体Continuation
相关的类,其第二层实现类ContinuationImpl
(第一层是BaseContinuationImpl
)主要就是增加了续体拦截器的功能函数intercepted()
,从协程上下文中获取Key
为ContinuationInterceptor
的续体拦截器上下文对象。续体拦截器的作用就是将原始续体对象包装为另一种续体类型的对象,从而增强原续体的功能。
//续体拦截器,它是一个协程上下文元素
public interface ContinuationInterceptor : CoroutineContext.Element
//续体拦截器的键
companion object Key : CoroutineContext.Key<ContinuationInterceptor>
//拦截续体,将原始续体(根据前面文章的分析,原始续体通常就是SuspendLambda的子类对象)转换为另一种续体子类类型
public fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>
...
1.2 Dispatchers调度器
kotlin协程库中有4种调度器,它们都是CoroutineDispatcher
的子类对象,而CoroutineDispatcher
又实现了ContinuationInterceptor
,所以调度器是通过续体拦截器实现的,每个调度器对象是上下文元素同时又是一个续体拦截器:
//Dispatchers单例对象,它包含4中调度器对象
public actual object Dispatchers
//4种调度器,都是CoroutineDispatcher的子类对象,CoroutineDispatcher是通过续体拦截器实现的。actual表示与平台相关,不同平台实现不同
public actual val Default: CoroutineDispatcher = createDefaultDispatcher()
public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
public actual val Unconfined: CoroutineDispatcher = kotlinx.coroutines.Unconfined
public val IO: CoroutineDispatcher = DefaultScheduler.IO
//协程调度器抽象类,定义了切换线程的相关方法
public abstract class CoroutineDispatcher :
AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor
public companion object Key : AbstractCoroutineContextKey<ContinuationInterceptor, CoroutineDispatcher>(
ContinuationInterceptor, it as? CoroutineDispatcher )
//注意是final修饰的:所有的调度器(续体拦截器)都将原续体包装为DispatchedContinuation类型
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
DispatchedContinuation(this, continuation)
//四种调度器都分别实现下面两个函数,确保在不同的线程调度
//判断是否需要切换线程,默认为true,表示需要切换线程,具体的调度器需要根绝情况重写
public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true
//调度:切换线程执行block
public abstract fun dispatch(context: CoroutineContext, block: Runnable)
//类关系图
CoroutineContext
.Element //上下文元素
|-- ContinuationInterceptor //续体拦截器:将续体包装为另一种续体类型
|-- CoroutineDispatcher //协程调度器抽象类:实现了拦截器功能,将原续体对象包装为DispatchedContinuation类型
//四种调度器,实现了CoroutineDispatcher,不同的平台实现不同
|-- Dispatchers.Default //
|-- Dispatchers.Main //
|-- Dispatchers.Unconfined//
|-- Dispatchers.IO //
2. 协程的3次线程调度
2.1 第一次线程调度:切换到指定线程开始执行协程代码
在《kotlin协程硬核解读(4. 协程的创建和启动流程分析)》一文中的第2.1章节,当使用协程构建器创建一个协程对象时,会在newCoroutineContext(context)
函数中检查作用域的上下文中是否存在key为ContinuationInterceptor
的元素,如果不存在将会添加一个Dispatchers.Default
对象,所以协程的上下文中必定会有一个ContinuationInterceptor的元素,也就是线程调度器,来明确协程代码在哪个线程上执行:
//☆ kotlin-stlib.jar
package kotlin.coroutines.jvm.internal
//续体的实现,增加了拦截功能
internal abstract class ContinuationImpl(
completion: Continuation<Any?>?, private val _context: CoroutineContext?) : BaseContinuationImpl(completion)
public override val context: CoroutineContext
get() = _context!!
private var intercepted: Continuation<Any?>? = null //被拦截的原始续体对象(SuspendLambda子类对象)
//3.1 从上下文中获取拦截器,实现续体对象包装
public fun intercepted(): Continuation<Any?> =
intercepted?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
.also intercepted = it
...
在启动协程之前,会调用createCoroutineUnintercepted(receiver, completion)
创建SuspendLambda
子类类型的原始续体对象,然后调用续体的intercepted()
将其包装为DispatchedContinuation
类型,最后调用resumeCancellableWith()
启动协程。resumeCancellableWith()
函数中使用调度器对象dispatcher判断是否需要切换线程,如果需要则调用dispatcher.dispatch()实现线程切换:
//☆ kotlin-coroutines-core-jvm.jar
//☆ CoroutineContext.kt
package kotlinx.coroutines
public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext
val combined = coroutineContext + context
val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined
//★1. 如果上下文中不存在ContinuationInterceptor类型的元素,添加默认的线程调度器Dispatchers.Default
return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)
debug + Dispatchers.Default else debug
//☆ Cancellable.kt
package kotlinx.coroutines.intrinsics
//启动协程
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
receiver: R, completion: Continuation<T>, onCancellation: ((cause: Throwable) -> Unit)? = null) =
runSafely(completion)
createCoroutineUnintercepted(receiver, completion) //★2. 重新创建续体对象(SuspendLambda子类对象)
.intercepted() //★3. 从协程上下文中获取调度器拦截原续体,将其包装为DispatchedContinuation类型
.resumeCancellableWith(Result.success(Unit), onCancellation) //★4. 启动协程
//☆ DispatchedContinuation.kt
package kotlinx.coroutines.internal
internal class DispatchedContinuation<in T>( //调度器续体包装类
@JvmField val dispatcher: CoroutineDispatcher, //持有调度器对象
@JvmField val continuation: Continuation<T> //持有原始续体对象
) : DispatchedTask<T>(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation<T> by continuation
public fun <T> Continuation<T>.resumeCancellableWith(result: Result<T>, onCancellation: ((cause: Throwable) -> Unit)? = null): Unit = when (this)
//★5. 如果续体类型是DispatchedContinuation,调用resumeCancellableWith()启动协程
is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)
else -> resumeWith(result)
inline fun resumeCancellableWith(result: Result<T>, noinline onCancellation: ((cause: Throwable) -> Unit)?)
val state = result.toState(onCancellation)
if (dispatcher.isDispatchNeeded(context)) //★6. 判断是否需要切换线程
_state = state
resumeMode = MODE_CANCELLABLE
dispatcher.dispatch(context, this) //★7. 切换线程后开始执行协程代码
else
executeUnconfined(state, MODE_CANCELLABLE)
if (!resumeCancelled(state))
resumeUndispatchedWith(result)
2.2 第二次线程调度:切换线程执行异步挂起函数
在《kotlin协程硬核解读(3. suspend挂起函数&挂起和恢复的实现原理)》一文中讲解了怎样自定义异步挂起函数,要想挂起函数相对于协程异步执行,有两种方式:
//方式1:调用suspendCancellableCoroutine()定义挂起函数,直接在函数中开启Thread,在子线程中执行函数体
suspend fun getUser(): User = suspendCancellableCoroutine
continuation ->
Thread //也可以通过线程池异步执行,retrofit对协程的支持网络请求就是通过okhttp的异步请求实现的(线程池)
...
continuation.resume(User("openXu"))
.start()
//方式2:调用withContext(...)定义挂起函数,通过传入调度器上下文实现线程切换
suspend fun getUser(): User = withContext(Dispatchers.IO)
...
User("openXu") //子协程返回值
第一种方式没什么说的,不管是通过线程池还是Thread,都能将函数体切换到子线程执行。第二种方式withContext(Dispatchers.IO)
当判断传入的调度器和父协程调度器不同时,会创建一个DispatchedCoroutine
类型的子协程,函数体就是子协程的挂起代码块,当执行子协程时,又会走2.1的步骤切换到指定线程开始执行子协程代码:
public suspend fun <T> withContext(
context: CoroutineContext, //协程上下文,一般情况下传递一个调度器
block: suspend CoroutineScope.() -> T //子协程代码块
): T
...
return suspendCoroutineUninterceptedOrReturn sc@ uCont ->
val oldContext = uCont.context //父协程的上下文
val newContext = oldContext + context //父协程上下文+新的调度器
...
if (newContext === oldContext)
//新的上下文和父协程上下文地址相同,不需要切换线程,将创建一个ScopeCoroutine类型的子协程
val coroutine = ScopeCoroutine(newContext, uCont)
return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
// 如果传入的调度器和父协程调度器相同,不需要切换线程,将创建一个UndispatchedCoroutine类型(ScopeCoroutine子类)的子协程
if (newContext[ContinuationInterceptor] == oldContext[ContinuationInterceptor])
val coroutine = UndispatchedCoroutine(newContext, uCont)
withCoroutineContext(newContext, null)
return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
//★上面两种不需要切换线程的就没必要跟踪了。当调度器不同时创建一个DispatchedCoroutine类型的子协程,切换到新线程执行子协程代码块(也就是函数体)
val coroutine = DispatchedCoroutine(newContext, uCont)
coroutine.initParentJob()
//★★★ 接2.1的startCoroutineCancellable(),创建子SuspendLambda续体对象,然后包装为DispatchedContinuation类型,最后dispatcher.dispatch()切换到新线程开始执行函数体
block.startCoroutineCancellable(coroutine, coroutine)
coroutine.getResult()
2.3第三次线程调度:异步挂起函数执行完毕后将函数执行结果切回协程所在线程并恢复协程执行
执行异步挂起函数时切换到新的协程执行函数有两种方式,当函数执行完毕切回到协程所在线程同样有两种方式。
对于第一种在函数体内通过线程池或者Thread的方式,当函数执行完毕会调用续体continuation
的resumeWith()
方法恢复协程的执行,而continuation
是协程被包装后的DispatchedContinuation
类型,所以就是调用DispatchedContinuation
的resumeWith()
。
第二种通过withContext()创建了一个子协程,根据《kotlin协程硬核解读(3. suspend挂起函数&挂起和恢复的实现原理》一文中<2.5 挂起、恢复实现原理源码解读>,协程代码块的代码会通过状态机被分为多个执行部分放入的SuspendLambda.invokeSuspend()
函数中,invokeSuspend()
又是在子协程的续体BaseContinuationImpl.resumeWith()
中调用的,所以子协程代码块的返回值将作为最后一次调用invokeSuspend()
函数的返回值,在BaseContinuationImpl.resumeWith()
函数中这个返回值将被传递给子协程的AbstractCoroutine.resumeWith()
函数作为子协程的返回值。withContext()
创建的子协程DispatchedCoroutine
重写了afterResume()
函数,该函数中调用父协程的续体的intercepted()
函数再次将父协程续体包装为DispatchedContinuation
类型,然后调用resumeCancellableWith()
切换线程,将挂起函数的结果返回并恢复父协程执行。
//所有协程对象的抽象父类
public abstract class AbstractCoroutine<in T>(protected val parentContext: CoroutineContext, active: Boolean = true)
: JobSupport(active), Job, Continuation<T>, CoroutineScope
public final override fun resumeWith(result: Result<T>)
val state = makeCompletingOnce(result.toState()) //尝试设置当前协程状态为已完成
if (state === COMPLETING_WAITING_CHILDREN) return
afterResume(state)
protected open fun afterResume(state: Any?): Unit = afterCompletion(state) //afterCompletion()是一个空实现,因为普通的协程代码块不需要返回值
//withContext()创建的调度器子协程类型
private class DispatchedCoroutine<in T>(
context: CoroutineContext, //父协程上下文+新的调度器
uCont: Continuation<T> //★ 持有父协程的续体对象,用于将子协程的执行结果切回到父协程的执行线程
) : ScopeCoroutine<T>(context, uCont)
...
//重写afterResume()函数,因为withContext()函数是有返回值的,子协程代码块的返回值将作为withContext()函数的返回值
override fun afterResume(state: Any?)
// 通过父协程的续体,切换回父协程所在的线程恢复父协程执行
uCont
.intercepted() //★ 调用父协程的续体的intercepted(),这个函数将从父协程上下文中获取父协程的调度器,并将父协程续体再次包装为DispatchedContinuation类型
.resumeCancellableWith(recoverResult(state, uCont)) //恢复父协程执行,并将子协程的返回值传给父协程(也就是withContext()函数的返回值)
2.4 小结
通过上面的对协程执行过程中的3次线程切换的源码跟踪,发现协程中的线程切换都会调用DispatchedContinuation
的resumeWith()
或者resumeCancellableWith()
函数,通过调度器对象dispatcher.isDispatchNeeded()判断是否需要切换线程,如果需要则调用dispatcher.dispatch()实现线程切换,然后在切换后的线程上执行Runnable调用原续体的resumeWith()从而触发invokeSuspend()启动或恢复协程执行:
//调度器续体,协程的第3层包装(第一层是协程对象,第二层是SuspendLambda)
internal class DispatchedContinuation<in T>(
@JvmField val dispatcher: CoroutineDispatcher,
@JvmField val continuation: Continuation<T> //原续体对象,可能是SuspendLambda,也可能是DispatchedContinuation
) : DispatchedTask<T>(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation<T> by continuation
/**
* 重写了续体的resumeWith()函数,在原有的恢复协程执行的基础上增加了线程切换功能,该方法会从挂起函数执行线程 切回到 协程调度线程
* resumeWith():挂起函数执行完毕后,通过调用resume系列函数恢复函数执行结果或者异常最终都会调用到该方法,恢复协程执行
* resumeCancellableWith():通常是启动协程、子协程执行完毕返回结果恢复父协程执行时调用
* resumeCancellableWith()和resumeWith()实现差不多,就多了一个是否cancel的判断,这里就不粘贴代码了
*/
override fun resumeWith(result: Result<T>)
...
//★1. 判断是否需要切换线程
if (dispatcher.isDispatchNeeded(context))
_state = state
resumeMode = MODE_ATOMIC
//★2. 调度器切换线程,然后执行this,也就是在切换后的线程上执行下面的run()方法
dispatcher.dispatch(context, this)
else
//不需要切换线程时,直接调用原续体的resumeWith()
executeUnconfined(state, MODE_ATOMIC)
withCoroutineContext(this.context, countOrElement)
continuation.resumeWith(result)
//this就是代理续体
override val delegate: Continuation<T>
get() = this
//DispatchedTask实现了Runnable,DispatchedContinuation继承了它,第2步切换线程时传入的this就表示在指定的线程执行run方法(已经切换线程后执行)
public final override fun run()
...
val delegate = delegate as DispatchedContinuation<T>
val continuation = delegate.continuation //原续体对象
val context = continuation.context
val state = takeState() // NOTE: Must take state in any case, even if cancelled
withCoroutineContext(context, delegate.countOrElement)
val exception = getExceptionalResult(state)
val job = if (exception == null && resumeMode.isCancellableMode) context[Job] else null
//★3. 在切换后的线程中调用原续体continuation的resumeWith()从而触发invokeSuspend()启动或恢复协程执行
if (job != null && !job.isActive)
val cause = job.getCancellationException()
cancelCompletedResult(state, cause)
continuation.resumeWithStackTrace(cause) //
else
if (exception != null)
continuation.resumeWithException(exception) //
else
continuation.resume(getSuccessfulResult(state))//
...
3. 调度器实现线程切换的原理
3.1 调度器的平台实现
协程库中有一个Dispatchers
单例对象,该对象包含协程的4中调度器成员变量,这些变量都是协程调度器CoroutineDispatcher
及其子类的对象:
//Dispatchers单例对象,它包含4中调度器对象
public actual object Dispatchers
//4种调度器,都是CoroutineDispatcher的子类对象,CoroutineDispatcher是通过续体拦截器实现的。actual表示与平台相关,不同平台实现不同
public actual val Default: CoroutineDispatcher = createDefaultDispatcher()
public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
public actual val Unconfined: CoroutineDispatcher = kotlinx.coroutines.Unconfined
public val IO: CoroutineDispatcher = DefaultScheduler.IO
注意观察除了IO
外,Default
、Main
、Unconfined
都被actual
修饰,actual
是kotlin的关键字,表示平台相关的实现。我们知道kotlin是一个跨平台的语言,它可以被编译成各种平台的可执行指令,比如Jvm、android、js、Native、ios、Linux、Windows等,在不同的平台上实现有些API难免会有差异。比如一般情况下带有UI相关的平台才会强调主线程,主线程专门用于刷新UI不能进行耗时操作导致UI卡顿,而其他平台如普通jvm上不强调主线程(也就不需要主线程调度器),所以对于Main
调度器的实现在不同平台上实现是不一样的。如果我们在普通jvm平台程序中使用Dispatchers.Main
会抛异常,说缺少具有Main调度器的模块,需要添加提供Main调度器的依赖,比如kotlinx-coroutines-android扩展包:
fun main()
runBlocking
//普通jvm平台没有Main调度器,测试中可以添加kotlinx-coroutines-test.jar依赖
withContext(Dispatchers.Main)
运行结果:
Exception in thread "main" java.lang.IllegalStateException: Module with the Main dispatcher is missing. Add dependency providing the Main dispatcher, e.gkotlin协程硬核解读(6. 协程调度器实现原理)