深入理解Kotlin协程协程调度器Dispatchers源码追踪扒皮
Posted 川峰
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了深入理解Kotlin协程协程调度器Dispatchers源码追踪扒皮相关的知识,希望对你有一定的参考价值。
android中的Kotlin协程调度器主要有四种:Dispatchers.Main、Dispatchers.Default、Dispatchers.IO、Dispatchers.Unconfined,第四种平时开发应该一般不会用到,所以主要了解一下前3种的实现。Dispatchers.Main
public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
// MainDispatchers.kt
internal object MainDispatcherLoader
private val FAST_SERVICE_LOADER_ENABLED = systemProp(FAST_SERVICE_LOADER_PROPERTY_NAME, true)
@JvmField
val dispatcher: MainCoroutineDispatcher = loadMainDispatcher()
private fun loadMainDispatcher(): MainCoroutineDispatcher
return try
val factories = if (FAST_SERVICE_LOADER_ENABLED)
FastServiceLoader.loadMainDispatcherFactory()
else
// We are explicitly using the
// `ServiceLoader.load(MyClass::class.java, MyClass::class.java.classLoader).iterator()`
// form of the ServiceLoader call to enable R8 optimization when compiled on Android.
ServiceLoader.load(
MainDispatcherFactory::class.java,
MainDispatcherFactory::class.java.classLoader
).iterator().asSequence().toList()
@Suppress("ConstantConditionIf")
factories.maxByOrNull it.loadPriority ?.tryCreateDispatcher(factories)
?: createMissingDispatcher()
catch (e: Throwable)
// Service loader can throw an exception as well
createMissingDispatcher(e)
可以看到Dispatchers.Main就是单例对象MainDispatcherLoader.loadMainDispatcher()方法的返回值,该方法会通过MainDispatcherFactory去创建一个MainCoroutineDispatcher对象。因此Dispatchers.Main属于MainCoroutineDispatcher类型。MainDispatcherFactory是一个抽象接口,它的实现类是AndroidDispatcherFactory,如下:
internal class AndroidDispatcherFactory : MainDispatcherFactory
override fun createDispatcher(allFactories: List<MainDispatcherFactory>): MainCoroutineDispatcher
val mainLooper = Looper.getMainLooper() ?: throw IllegalStateException("The main looper is not available")
return HandlerContext(mainLooper.asHandler(async = true))
override fun hintOnError(): String = "For tests Dispatchers.setMain from kotlinx-coroutines-test module can be used"
override val loadPriority: Int
get() = Int.MAX_VALUE / 2
很明显这里createDispatcher方法创建了一个HandlerContext对象返回,并且我们注意到,它使用主线程的Looper对象来创建的Handler。
事实上 Dispatchers.Main 是一个多平台化的 API,在 Android、JavaFX、Swing等场景下实现的细节都不同,我们用 Intelij IDEA 打开 kotlinx.coroutines 项目的源码,这三者的实现都位于 ui 这个目录下。我们在 kotlinx.coroutines.android 包下找到了 HandlerDispatcher.kt 这个代码文件,HandlerContext的具体实现就在其中。 因此,我们可以得到一个结论就是:在 Android 平台上 Dispatchers.Main 就是一个 HandlerContext 对象。 HandlerContext的主要源码如下:// HandlerDispatcher.kt
internal class HandlerContext private constructor(
private val handler: Handler,
private val name: String?,
private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay
constructor(
handler: Handler,
name: String? = null
) : this(handler, name, false)
@Volatile
private var _immediate: HandlerContext? = if (invokeImmediately) this else null
override val immediate: HandlerContext = _immediate ?:
HandlerContext(handler, name, true).also _immediate = it
override fun isDispatchNeeded(context: CoroutineContext): Boolean
return !invokeImmediately || Looper.myLooper() != handler.looper
override fun dispatch(context: CoroutineContext, block: Runnable)
if (!handler.post(block))
cancelOnRejection(context, block)
...
可以看到它的 dispatch() 也就是线程调度方法,其实就是通过Handler.post()将任务发送到主线程的消息队列中等待执行,为什么是主线程,因为前面看到工厂类创建它的时候使用的Looper.MainLooper。所以kotlin协程在Android平台上的实现还是离不开老本行Handler消息循环机制,因为Android的主线程UI刷新就是基于Handler消息循环机制。
注意到,HandlerContext 是 HandlerDispatcher(密封类) 的子类,而 HandlerDispatcher 是 MainCoroutineDispatcher 的子类,MainCoroutineDispatcher 又是 CoroutineDispatcher 的子类:
public sealed class HandlerDispatcher : MainCoroutineDispatcher(), Delay
public abstract class MainCoroutineDispatcher : CoroutineDispatcher()
CoroutineDispatcher 是所有调度器的抽象基类,它是AbstractCoroutineContextElement的子类,也就是协程上下文的抽象子类。所以说调度器可以用 + 运算符添加到context中。
public abstract class CoroutineDispatcher : AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor
...
public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true
public abstract fun dispatch(context: CoroutineContext, block: Runnable)
...
CoroutineDispatcher 里面有两个重要的方法 isDispatchNeeded() 和 dispatch():
- isDispatchNeeded():如果协程的执行应该使用 dispatch() 方法执行,则返回 true。大多数调度程序的默认行为是返回 true。
- dispatch():将可运行的 block 的执行分派到给定 context 中的另一个线程。
public interface ContinuationInterceptor : CoroutineContext.Element
...
public fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>
...
在 CoroutineDispatcher 中这个方法的实现如下:
public abstract class CoroutineDispatcher : AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor
...
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> = DispatchedContinuation(this, continuation)
...
它返回的是一个DispatchedContinuation对象:
internal class DispatchedContinuation<in T>(
@JvmField val dispatcher: CoroutineDispatcher,
@JvmField val continuation: Continuation<T>
) : DispatchedTask<T>(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation<T> by continuation
...
override fun resumeWith(result: Result<T>)
val context = continuation.context
val state = result.toState()
if (dispatcher.isDispatchNeeded(context))
_state = state
resumeMode = MODE_ATOMIC
dispatcher.dispatch(context, this)
else
executeUnconfined(state, MODE_ATOMIC)
withCoroutineContext(this.context, countOrElement)
continuation.resumeWith(result)
...
上面在DispatchedContinuation类的resumeWith()方法中,先判断了isDispatchNeeded()的值,如果是true就调用调度器dispatcher.dispatch()方法进行调度,并将当前DispatchedContinuation对象自身作为this参数传入。
Dispatchers.Main.immediate
我们知道,lifecycleScope 和 viewModelScope 的协程上下文都是 SupervisorJob + Dispatchers.Main.immediate。 按照前面的理解Main已经是主线程了,那么immediate又是什么?因为 Dispatchers.Main 是 HandlerContext, 所以 Dispatchers.Main.immediate 就是 HandlerContext.immediate 。再仔细的看一下 HandlerContext 类:internal class HandlerContext private constructor(
private val handler: Handler,
private val name: String?,
private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay
...
@Volatile
private var _immediate: HandlerContext? = if (invokeImmediately) this else null
override val immediate: HandlerContext = _immediate ?:
HandlerContext(handler, name, true).also _immediate = it
override fun isDispatchNeeded(context: CoroutineContext): Boolean
return !invokeImmediately || Looper.myLooper() != handler.looper
override fun dispatch(context: CoroutineContext, block: Runnable)
if (!handler.post(block))
cancelOnRejection(context, block)
...
这里HandlerContext.immediate仍然是创建的自身类型。因此可以得知 Dispatchers.Main.immediate 的真相是一个第三个参数为 true 的 HandlerContext 对象。
同时,可以关注到 HandlerContext 类的 isDispatchNeeded() 方法的判断逻辑是 !invokeImmediately || Looper.myLooper() != handler.looper 返回true的时候才会进行调度dispatch()。而在构造时第3个参数传入的值为true,也就是说 invokeImmediately = true,因此判断逻辑主要取决于后半部分 Looper.myLooper() != handler.looper,而前面分析过 HandlerContext 的 handler 成员变量在构造 HandlerContext 时传入的是 持有主线程Looper的Hanlder对象,也就是说,isDispatchNeeded() 判断逻辑就可以简化成:如果当前线程不是主线程,则调用dispatch()进行调度,否则就不用调度。 至此,可以得到结论是:Dispatchers.Main.immediate 这种调度器与 Dispatchers.Main 的唯一区别是前者只需要简单的判断当前是不是主线程,来决定是否执行调度。 因此,Dispatchers.Main.immediate 的含义就是立即在主线程上执行。Dispatchers.Default
@JvmStatic
public actual val Default: CoroutineDispatcher = DefaultScheduler
internal object DefaultScheduler : SchedulerCoroutineDispatcher(
CORE_POOL_SIZE, MAX_POOL_SIZE,
IDLE_WORKER_KEEP_ALIVE_NS, DEFAULT_SCHEDULER_NAME
)
...
override fun toString(): String = "Dispatchers.Default"
internal open class SchedulerCoroutineDispatcher(
private val corePoolSize: Int = CORE_POOL_SIZE,
private val maxPoolSize: Int = MAX_POOL_SIZE,
private val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
private val schedulerName: String = "CoroutineScheduler",
) : ExecutorCoroutineDispatcher()
override val executor: Executor
get() = coroutineScheduler
private var coroutineScheduler = createScheduler()
private fun createScheduler() =
CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName)
override fun dispatch(context: CoroutineContext, block: Runnable): Unit = coroutineScheduler.dispatch(block)
...
DefaultsScheduler是一个SchedulerCoroutineDispatcher类型,SchedulerCoroutineDispatcher类中它真正创建的是一个CoroutineScheduler对象:
internal class CoroutineScheduler(
@JvmField val corePoolSize: Int,
@JvmField val maxPoolSize: Int,
@JvmField val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
@JvmField val schedulerName: String = DEFAULT_SCHEDULER_NAME
) : Executor, Closeable
init
require(corePoolSize >= MIN_SUPPORTED_POOL_SIZE)
"Core pool size $corePoolSize should be at least $MIN_SUPPORTED_POOL_SIZE"
require(maxPoolSize >= corePoolSize)
"Max pool size $maxPoolSize should be greater than or equals to core pool size $corePoolSize"
require(maxPoolSize <= MAX_SUPPORTED_POOL_SIZE)
"Max pool size $maxPoolSize should not exceed maximal supported number of threads $MAX_SUPPORTED_POOL_SIZE"
require(idleWorkerKeepAliveNs > 0)
"Idle worker keep alive time $idleWorkerKeepAliveNs must be positive"
...
CoroutineScheduler类的代码比较多,但是我们根据这些参数知道它就是一个线程池,是一个专门自定义的线程池,而没有使用Java版本的ThreadPoolExecutor。
对于线程池一般而言最重要的就是研究它的几个配置参数,来看一下构造SchedulerCoroutineDispatcher时传入的几个参数:@JvmField
internal val CORE_POOL_SIZE = systemProp(
"kotlinx.coroutines.scheduler.core.pool.size",
AVAILABLE_PROCESSORS.coerceAtLeast(2),
minValue = CoroutineScheduler.MIN_SUPPORTED_POOL_SIZE
)
@JvmField
internal val MAX_POOL_SIZE = systemProp(
"kotlinx.coroutines.scheduler.max.pool.size",
CoroutineScheduler.MAX_SUPPORTED_POOL_SIZE,
maxValue = CoroutineScheduler.MAX_SUPPORTED_POOL_SIZE
)
// number of processors at startup for consistent prop initialization
internal val AVAILABLE_PROCESSORS = Runtime.getRuntime().availableProcessors()
// Masks of control state
private const val BLOCKING_SHIFT = 21 // 2M threads max
...
internal const val MIN_SUPPORTED_POOL_SIZE = 1
internal const val MAX_SUPPORTED_POOL_SIZE = (1 shl BLOCKING_SHIFT) - 2
@JvmField
internal val IDLE_WORKER_KEEP_ALIVE_NS = TimeUnit.SECONDS.toNanos(
systemProp("kotlinx.coroutines.scheduler.keep.alive.sec", 60L)
)
// Internal debuggability name + thread name prefixes
internal const val DEFAULT_SCHEDULER_NAME = "DefaultDispatcher"
总结一下就是:
- corePoolSize = CORE_POOL_SIZE, 核心线程数,最小值为 2 最大值为 Runtime.getRuntime().availableProcessors() 即CPU的核心数量
- maxPoolSize = MAX_POOL_SIZE, 最大线程数,值为 (1 << 21) - 2,其中 1 << 21 大小正好是 2M,所以是 2M - 2 = 2097150(两百多万个)
- idleWorkerKeepAliveNs = Long = IDLE_WORKER_KEEP_ALIVE_NS, 空闲任务的存活时间,值为60s
- schedulerName = DEFAULT_SCHEDULER_NAME, 默认的调度器名字,值为"DefaultDispatcher"
Dispatchers.IO
@JvmStatic
public val IO: CoroutineDispatcher = Defaultioscheduler
// Dispatchers.IO
internal object DefaultIoScheduler : ExecutorCoroutineDispatcher(), Executor
private val default = UnlimitedIoScheduler.limitedParallelism(
systemProp(
IO_PARALLELISM_PROPERTY_NAME,
64.coerceAtLeast(AVAILABLE_PROCESSORS)
)
)
override val executor: Executor
get() = this
override fun execute(command: java.lang.Runnable) = dispatch(EmptyCoroutineContext, command)
override fun dispatch(context: CoroutineContext, block: Runnable)
default.dispatch(context, block)
...
dispatch()方法调用了default.dispatch(),而default是由UnlimitedIoScheduler.limitedParallelism()方法返回一个LimitedDispatcher对象:
public open fun limitedParallelism(parallelism: Int): CoroutineDispatcher
parallelism.checkParallelism()
return LimitedDispatcher(this, parallelism)
internal class LimitedDispatcher(
private val dispatcher: CoroutineDispatcher,
private val parallelism: Int
) : CoroutineDispatcher(), Runnable, Delay by (dispatcher as? Delay ?: DefaultDelay)
....
override fun dispatch(context: CoroutineContext, block: Runnable)
dispatchInternal(block)
dispatcher.dispatch(this, this)
LimitedDispatcher内部也只是包装了传入的UnlimitedIoScheduler对象而已,而在UnlimitedIoScheduler中仍然是调用的DefaultScheduler.dispatchWithContext方法。
// The unlimited instance of Dispatchers.IO that utilizes all the threads CoroutineScheduler provides
private object UnlimitedIoScheduler : CoroutineDispatcher()
@InternalCoroutinesApi
override fun dispatchYield(context: CoroutineContext, block: Runnable)
DefaultScheduler.dispatchWithContext(block, BlockingContext, true)
override fun dispatch(context: CoroutineContext, block: Runnable)
DefaultScheduler.dispatchWithContext(block, BlockingContext, false)
因此,至此我们得到一个结论就是: Dispatchers.IO 跟 Dispatchers.Default 是共享同一个线程池的。
那 Dispatchers.IO 跟 Dispatchers.Default 有什么不同呢?我们看到在LimitedDispatcher的构造函数中传入了一个parallelism参数,它表示并发任务的执行数量。这个参数的值是在前面调用UnlimitedIoScheduler.limitedParallelism()时设置的:private val default = UnlimitedIoScheduler.limitedParallelism(
systemProp(
IO_PARALLELISM_PROPERTY_NAME,
64.coerceAtLeast(AVAILABLE_PROCESSORS)
)
)
public const val IO_PARALLELISM_PROPERTY_NAME: String = "kotlinx.coroutines.io.parallelism"
这里它传入的值为 64 和 Runtime.getRuntime().availableProcessors() 之间的最大值,也就是说并发的协程任务数被控制在 [N*cpu, 64] 或 [64, N*cpu] 的范围内。
这里为什么要限制
Dispatchers.IO 这种调度器的并发任务执行数量呢?
由于 IO 任务通常会阻塞实际执行任务的线程,在阻塞过程中线程虽然不占用 CPU, 但却占用了大量的内存,这段时间内被 IO 任务占据线程实际上是资源使用不合理的表现,因此 IO 调度器对于 IO 任务的并发做了限制,避免过多的 IO 任务并发占用过多的系统资源,同时在调度时为任务打上 PROBABLY BLOCKING 标签,以方便线程池在执行任务调度时对阻塞任务和非阻塞任务区别对待。
最后再来个总结:
- Dispatchers.Main :在 Android 平台上 Dispatchers.Main 就是一个 HandlerContext 对象。
- Dispatchers.Main.immediate:与 Dispatchers.Main 的唯一区别就是它的调度执行只需要简单的判断当前是不是主线程。
- Dispatchers.Default:使用自定义的线程池,核心线程数最小为2最大为CPU核心数,最大线程数2M左右,空闲任务存活时间为60s
- Dispatchers.IO :与 Dispatchers.Default 是共用同一个线程池的,但是增加了对并发任务的数量控制。
以上是关于深入理解Kotlin协程协程调度器Dispatchers源码追踪扒皮的主要内容,如果未能解决你的问题,请参考以下文章
Kotlin 协程协程底层实现 ② ( 协程调度器 | 协程任务泄漏 | 结构化并发 )
Kotlin 协程协程底层实现 ② ( 协程调度器 | 协程任务泄漏 | 结构化并发 )