深入理解Kotlin协程协程调度器Dispatchers源码追踪扒皮

Posted 川峰

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了深入理解Kotlin协程协程调度器Dispatchers源码追踪扒皮相关的知识,希望对你有一定的参考价值。

android中的Kotlin协程调度器主要有四种:Dispatchers.Main、Dispatchers.Default、Dispatchers.IO、Dispatchers.Unconfined,第四种平时开发应该一般不会用到,所以主要了解一下前3种的实现。

Dispatchers.Main

public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
// MainDispatchers.kt
internal object MainDispatcherLoader 
    private val FAST_SERVICE_LOADER_ENABLED = systemProp(FAST_SERVICE_LOADER_PROPERTY_NAME, true)
    @JvmField
    val dispatcher: MainCoroutineDispatcher = loadMainDispatcher()

    private fun loadMainDispatcher(): MainCoroutineDispatcher 
        return try 
            val factories = if (FAST_SERVICE_LOADER_ENABLED) 
                FastServiceLoader.loadMainDispatcherFactory()
             else 
                // We are explicitly using the
                // `ServiceLoader.load(MyClass::class.java, MyClass::class.java.classLoader).iterator()`
                // form of the ServiceLoader call to enable R8 optimization when compiled on Android.
                ServiceLoader.load(
                        MainDispatcherFactory::class.java,
                        MainDispatcherFactory::class.java.classLoader
                ).iterator().asSequence().toList()
            
            @Suppress("ConstantConditionIf")
            factories.maxByOrNull  it.loadPriority ?.tryCreateDispatcher(factories)
                ?: createMissingDispatcher()
         catch (e: Throwable) 
            // Service loader can throw an exception as well
            createMissingDispatcher(e)
        
    

可以看到Dispatchers.Main就是单例对象MainDispatcherLoader.loadMainDispatcher()方法的返回值,该方法会通过MainDispatcherFactory去创建一个MainCoroutineDispatcher对象。因此Dispatchers.Main属于MainCoroutineDispatcher类型。MainDispatcherFactory是一个抽象接口,它的实现类是AndroidDispatcherFactory,如下:

internal class AndroidDispatcherFactory : MainDispatcherFactory 
    override fun createDispatcher(allFactories: List<MainDispatcherFactory>): MainCoroutineDispatcher 
        val mainLooper = Looper.getMainLooper() ?: throw IllegalStateException("The main looper is not available")
        return HandlerContext(mainLooper.asHandler(async = true))
    

    override fun hintOnError(): String = "For tests Dispatchers.setMain from kotlinx-coroutines-test module can be used"
    override val loadPriority: Int
        get() = Int.MAX_VALUE / 2

很明显这里createDispatcher方法创建了一个HandlerContext对象返回,并且我们注意到,它使用主线程的Looper对象来创建的Handler

事实上 Dispatchers.Main 是一个多平台化的 API,在 Android、JavaFX、Swing等场景下实现的细节都不同,我们用 Intelij IDEA 打开 kotlinx.coroutines 项目的源码,这三者的实现都位于 ui 这个目录下。我们在 kotlinx.coroutines.android 包下找到了 HandlerDispatcher.kt 这个代码文件,HandlerContext的具体实现就在其中。           因此,我们可以得到一个结论就是:在 Android 平台上 Dispatchers.Main 就是一个 HandlerContext 对象。          HandlerContext的主要源码如下:
// HandlerDispatcher.kt
internal class HandlerContext private constructor(
    private val handler: Handler,
    private val name: String?,
    private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay 
    constructor(
        handler: Handler,
        name: String? = null
    ) : this(handler, name, false)

    @Volatile
    private var _immediate: HandlerContext? = if (invokeImmediately) this else null

    override val immediate: HandlerContext = _immediate ?:
        HandlerContext(handler, name, true).also  _immediate = it 

    override fun isDispatchNeeded(context: CoroutineContext): Boolean 
        return !invokeImmediately || Looper.myLooper() != handler.looper
    

    override fun dispatch(context: CoroutineContext, block: Runnable) 
        if (!handler.post(block)) 
            cancelOnRejection(context, block)
        
    
    ...
可以看到它的 dispatch() 也就是线程调度方法,其实就是通过Handler.post()将任务发送到主线程的消息队列中等待执行,为什么是主线程,因为前面看到工厂类创建它的时候使用的Looper.MainLooper。所以kotlin协程在Android平台上的实现还是离不开老本行Handler消息循环机制,因为Android的主线程UI刷新就是基于Handler消息循环机制。             注意到,HandlerContext 是 HandlerDispatcher(密封类) 的子类,而 HandlerDispatcher 是 MainCoroutineDispatcher 的子类,MainCoroutineDispatcher 又是 CoroutineDispatcher 的子类:
public sealed class HandlerDispatcher : MainCoroutineDispatcher(), Delay  
public abstract class MainCoroutineDispatcher : CoroutineDispatcher()  

CoroutineDispatcher 所有调度器抽象基类,它是AbstractCoroutineContextElement的子类,也就是协程上下文的抽象子类。所以说调度器可以用 + 运算符添加到context中。

public abstract class CoroutineDispatcher : AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor 
    ...
    public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true
    public abstract fun dispatch(context: CoroutineContext, block: Runnable)
    ...

CoroutineDispatcher  里面有两个重要的方法 isDispatchNeeded() 和 dispatch()

  • isDispatchNeeded():如果协程的执行应该使用 dispatch() 方法执行,则返回 true。大多数调度程序的默认行为是返回 true
  • dispatch():将可运行的 block 的执行分派到给定 context 中的另一个线程。
也就是说,如果isDispatchNeeded()返回true,则执行dispatch()进行调度。               CoroutineDispatcher 还实现了拦截器接口CoroutineInterceptor,也就是说调度器本身就是一个续体的拦截器(在挂起函数恢复时调用)。             CoroutineInterceptor中的interceptContinuation 接口方法会返回一个包装了原始Continuation对象的新Continuation对象
public interface ContinuationInterceptor : CoroutineContext.Element 
    ...
    public fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>
    ...

在 CoroutineDispatcher 中这个方法的实现如下:

public abstract class CoroutineDispatcher : AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor 
    ...
    public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> = DispatchedContinuation(this, continuation)
    ...

它返回的是一个DispatchedContinuation对象:

internal class DispatchedContinuation<in T>(
    @JvmField val dispatcher: CoroutineDispatcher,
    @JvmField val continuation: Continuation<T>
) : DispatchedTask<T>(MODE_UNINITIALIZED), CoroutineStackFrame, Continuation<T> by continuation 
      ...
    override fun resumeWith(result: Result<T>) 
        val context = continuation.context
        val state = result.toState()
        if (dispatcher.isDispatchNeeded(context)) 
            _state = state
            resumeMode = MODE_ATOMIC
            dispatcher.dispatch(context, this)
         else 
            executeUnconfined(state, MODE_ATOMIC) 
                withCoroutineContext(this.context, countOrElement) 
                    continuation.resumeWith(result)
                
            
        
   
    ...

上面在DispatchedContinuation类的resumeWith()方法中,先判断了isDispatchNeeded()的值,如果是true就调用调度器dispatcher.dispatch()方法进行调度,并将当前DispatchedContinuation对象自身作为this参数传入。

Dispatchers.Main.immediate

我们知道,lifecycleScope 和 viewModelScope 的协程上下文都是 SupervisorJob + Dispatchers.Main.immediate。 按照前面的理解Main已经是主线程了,那么immediate又是什么?因为 Dispatchers.Main 是 HandlerContext, 所以 Dispatchers.Main.immediate 就是 HandlerContext.immediate 。再仔细的看一下 HandlerContext 类: 
internal class HandlerContext private constructor(
    private val handler: Handler,
    private val name: String?,
    private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay 
    ...
    @Volatile
    private var _immediate: HandlerContext? = if (invokeImmediately) this else null

    override val immediate: HandlerContext = _immediate ?:
        HandlerContext(handler, name, true).also  _immediate = it 

    override fun isDispatchNeeded(context: CoroutineContext): Boolean 
        return !invokeImmediately || Looper.myLooper() != handler.looper
    

    override fun dispatch(context: CoroutineContext, block: Runnable) 
        if (!handler.post(block)) 
            cancelOnRejection(context, block)
        
    
    ...

这里HandlerContext.immediate仍然是创建的自身类型。因此可以得知 Dispatchers.Main.immediate 的真相是一个第三个参数为 true 的 HandlerContext 对象。

同时,可以关注到 HandlerContext 类的 isDispatchNeeded() 方法的判断逻辑是 !invokeImmediately || Looper.myLooper() != handler.looper 返回true的时候才会进行调度dispatch()。而在构造时第3个参数传入的值为true,也就是说 invokeImmediately = true,因此判断逻辑主要取决于后半部分 Looper.myLooper() != handler.looper,而前面分析过 HandlerContext 的 handler 成员变量在构造 HandlerContext 时传入的是 持有主线程Looper的Hanlder对象,也就是说,isDispatchNeeded() 判断逻辑就可以简化成:如果当前线程不是主线程,则调用dispatch()进行调度,否则就不用调度。         至此,可以得到结论是:Dispatchers.Main.immediate 这种调度器与 Dispatchers.Main 的唯一区别是前者只需要简单的判断当前是不是主线程,来决定是否执行调度。          因此,Dispatchers.Main.immediate 的含义就是立即在主线程上执行。

Dispatchers.Default

@JvmStatic
public actual val Default: CoroutineDispatcher = DefaultScheduler

internal object DefaultScheduler : SchedulerCoroutineDispatcher(
    CORE_POOL_SIZE, MAX_POOL_SIZE,
    IDLE_WORKER_KEEP_ALIVE_NS, DEFAULT_SCHEDULER_NAME
) 
    ...
    override fun toString(): String = "Dispatchers.Default"


internal open class SchedulerCoroutineDispatcher(
    private val corePoolSize: Int = CORE_POOL_SIZE,
    private val maxPoolSize: Int = MAX_POOL_SIZE,
    private val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
    private val schedulerName: String = "CoroutineScheduler",
) : ExecutorCoroutineDispatcher() 

    override val executor: Executor
        get() = coroutineScheduler

    private var coroutineScheduler = createScheduler()

    private fun createScheduler() =
        CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName)

    override fun dispatch(context: CoroutineContext, block: Runnable): Unit = coroutineScheduler.dispatch(block)

    ...

DefaultsScheduler是一个SchedulerCoroutineDispatcher类型,SchedulerCoroutineDispatcher类中它真正创建的是一个CoroutineScheduler对象:

internal class CoroutineScheduler(
    @JvmField val corePoolSize: Int,
    @JvmField val maxPoolSize: Int,
    @JvmField val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
    @JvmField val schedulerName: String = DEFAULT_SCHEDULER_NAME
) : Executor, Closeable 
    init 
        require(corePoolSize >= MIN_SUPPORTED_POOL_SIZE) 
            "Core pool size $corePoolSize should be at least $MIN_SUPPORTED_POOL_SIZE"
        
        require(maxPoolSize >= corePoolSize) 
            "Max pool size $maxPoolSize should be greater than or equals to core pool size $corePoolSize"
        
        require(maxPoolSize <= MAX_SUPPORTED_POOL_SIZE) 
            "Max pool size $maxPoolSize should not exceed maximal supported number of threads $MAX_SUPPORTED_POOL_SIZE"
        
        require(idleWorkerKeepAliveNs > 0) 
            "Idle worker keep alive time $idleWorkerKeepAliveNs must be positive"
        
    
    ...

CoroutineScheduler类的代码比较多,但是我们根据这些参数知道它就是一个线程池,是一个专门自定义的线程池,而没有使用Java版本的ThreadPoolExecutor。

对于线程池一般而言最重要的就是研究它的几个配置参数,来看一下构造SchedulerCoroutineDispatcher时传入的几个参数:
@JvmField
internal val CORE_POOL_SIZE = systemProp(
    "kotlinx.coroutines.scheduler.core.pool.size",
    AVAILABLE_PROCESSORS.coerceAtLeast(2),
    minValue = CoroutineScheduler.MIN_SUPPORTED_POOL_SIZE
)
@JvmField
internal val MAX_POOL_SIZE = systemProp(
    "kotlinx.coroutines.scheduler.max.pool.size",
    CoroutineScheduler.MAX_SUPPORTED_POOL_SIZE,
    maxValue = CoroutineScheduler.MAX_SUPPORTED_POOL_SIZE
)
// number of processors at startup for consistent prop initialization
internal val AVAILABLE_PROCESSORS = Runtime.getRuntime().availableProcessors()
// Masks of control state
private const val BLOCKING_SHIFT = 21 // 2M threads max
...
internal const val MIN_SUPPORTED_POOL_SIZE = 1 
internal const val MAX_SUPPORTED_POOL_SIZE = (1 shl BLOCKING_SHIFT) - 2
@JvmField
internal val IDLE_WORKER_KEEP_ALIVE_NS = TimeUnit.SECONDS.toNanos(
    systemProp("kotlinx.coroutines.scheduler.keep.alive.sec", 60L)
)
// Internal debuggability name + thread name prefixes
internal const val DEFAULT_SCHEDULER_NAME = "DefaultDispatcher"

总结一下就是:

  • corePoolSize = CORE_POOL_SIZE, 核心线程数,最小值为 2 最大值为 Runtime.getRuntime().availableProcessors()  即CPU的核心数量
  • maxPoolSize = MAX_POOL_SIZE, 最大线程数,值为  (1 << 21) - 2,其中 1 << 21 大小正好是 2M,所以是 2M - 2  = 2097150(两百多万个)
  • idleWorkerKeepAliveNs = Long = IDLE_WORKER_KEEP_ALIVE_NS, 空闲任务的存活时间,值为60s
  • schedulerName = DEFAULT_SCHEDULER_NAME, 默认的调度器名字,值为"DefaultDispatcher"

Dispatchers.IO

@JvmStatic
public val IO: CoroutineDispatcher = Defaultioscheduler
// Dispatchers.IO
internal object DefaultIoScheduler : ExecutorCoroutineDispatcher(), Executor 

    private val default = UnlimitedIoScheduler.limitedParallelism(
        systemProp(
            IO_PARALLELISM_PROPERTY_NAME,
            64.coerceAtLeast(AVAILABLE_PROCESSORS)
        )
    )

    override val executor: Executor
        get() = this

    override fun execute(command: java.lang.Runnable) = dispatch(EmptyCoroutineContext, command)

    override fun dispatch(context: CoroutineContext, block: Runnable) 
        default.dispatch(context, block)
    
     ...

dispatch()方法调用了default.dispatch(),而default是由UnlimitedIoScheduler.limitedParallelism()方法返回一个LimitedDispatcher对象:

public open fun limitedParallelism(parallelism: Int): CoroutineDispatcher 
    parallelism.checkParallelism()
    return LimitedDispatcher(this, parallelism)

internal class LimitedDispatcher(
    private val dispatcher: CoroutineDispatcher,
    private val parallelism: Int
) : CoroutineDispatcher(), Runnable, Delay by (dispatcher as? Delay ?: DefaultDelay) 
    ....
    override fun dispatch(context: CoroutineContext, block: Runnable) 
        dispatchInternal(block) 
            dispatcher.dispatch(this, this)
        
    

LimitedDispatcher内部也只是包装了传入的UnlimitedIoScheduler对象而已,而在UnlimitedIoScheduler中仍然是调用的DefaultScheduler.dispatchWithContext方法。

// The unlimited instance of Dispatchers.IO that utilizes all the threads CoroutineScheduler provides
private object UnlimitedIoScheduler : CoroutineDispatcher() 

    @InternalCoroutinesApi
    override fun dispatchYield(context: CoroutineContext, block: Runnable) 
        DefaultScheduler.dispatchWithContext(block, BlockingContext, true)
    


    override fun dispatch(context: CoroutineContext, block: Runnable) 
        DefaultScheduler.dispatchWithContext(block, BlockingContext, false)
    

因此,至此我们得到一个结论就是: Dispatchers.IO 跟 Dispatchers.Default 是共享同一个线程池的。

那 Dispatchers.IO 跟 Dispatchers.Default 有什么不同呢?我们看到在LimitedDispatcher的构造函数中传入了一个parallelism参数,它表示并发任务的执行数量。这个参数的值是在前面调用UnlimitedIoScheduler.limitedParallelism()时设置的:
private val default = UnlimitedIoScheduler.limitedParallelism(
        systemProp(
            IO_PARALLELISM_PROPERTY_NAME,
            64.coerceAtLeast(AVAILABLE_PROCESSORS)
        )
    )
public const val IO_PARALLELISM_PROPERTY_NAME: String = "kotlinx.coroutines.io.parallelism"
这里它传入的值为 64 和 Runtime.getRuntime().availableProcessors() 之间的最大值,也就是说并发的协程任务数被控制在 [N*cpu, 64] 或 [64, N*cpu] 的范围内。          这里为什么要限制  Dispatchers.IO 这种调度器的并发任务执行数量呢? 由于 IO 任务通常会阻塞实际执行任务的线程,在阻塞过程中线程虽然不占用 CPU,  但却占用了大量的内存,这段时间内被 IO 任务占据线程实际上是资源使用不合理的表现,因此 IO 调度器对于 IO 任务的并发做了限制,避免过多的 IO 任务并发占用过多的系统资源,同时在调度时为任务打上 PROBABLY BLOCKING 标签,以方便线程池在执行任务调度时对阻塞任务和非阻塞任务区别对待。                     最后再来个总结:
  • Dispatchers.Main :在 Android 平台上 Dispatchers.Main 就是一个 HandlerContext 对象。
  • Dispatchers.Main.immediate:与 Dispatchers.Main 的唯一区别就是它的调度执行只需要简单的判断当前是不是主线程。
  • Dispatchers.Default:使用自定义的线程池,核心线程数最小为2最大为CPU核心数,最大线程数2M左右,空闲任务存活时间为60s
  • Dispatchers.IO :与 Dispatchers.Default 是共用同一个线程池的,但是增加了对并发任务的数量控制。

以上是关于深入理解Kotlin协程协程调度器Dispatchers源码追踪扒皮的主要内容,如果未能解决你的问题,请参考以下文章

深入理解Kotlin协程协程的分类与线程的区别

Kotlin 协程协程底层实现 ② ( 协程调度器 | 协程任务泄漏 | 结构化并发 )

Kotlin 协程协程底层实现 ② ( 协程调度器 | 协程任务泄漏 | 结构化并发 )

深入理解Kotlin协程协程的创建启动挂起函数理论篇

深入理解Kotlin协程协程的上下文 CoroutineContext

深入理解Kotlin协程协程中的Channel和Flow & 协程中的线程安全问题