Kotlin协程基础概念深入理解

Posted bug樱樱

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kotlin协程基础概念深入理解相关的知识,希望对你有一定的参考价值。

本文需要读者对协程有基础的了解,关于协程的使用,可以参考官方教程:[[play.kotlinlang.org/hands-on/In…](https://link.juejin.cn?target=https%3A%2F%2Fplay.kotlinlang.org%2Fhands-on%2FIntroduction%2520to%2520Coroutines%2520and%2520Channels%2F01_Introduction%255D( “https://play.kotlinlang.org/hands-on/Introduction%20to%20Coroutines%20and%20Channels/01_Introduction%5D(”)play.kotlinlang.org/hands-on/In… to Coroutines and Channels/01_Introduction)

协程是什么?

协程库是Kotlin语言提供的一个库,用于处理异步和并发场景的框架。

“一个协程是一个可挂起计算的对象。在概念上与线程相似。从某种意义上说,协程代码块会与其他代码同时运行。然而,协程不受任何特定线程的约束。它可能会在某个线程挂起,然后在另一个线程被唤醒。”

从官方文档的描述理解,协程实际上就是一个对象。从下面的例子中,来理解协程到底是个什么东西,以及它和线程有什么不一样的地方。

printTimeAndString方法是一个打印时间和输入字符的方法,在main函数中,通过GlobalScope.launch启动了一个协程,并且delay 1秒,在delay前后,打印时间和print in coroutine start/end。然后,外部线程打印print in thread,并睡眠2s,最后打印end结束main方法。

fun main() 
    printTimeAndString("start")
    GlobalScope.launch 
      	printTimeAndString("print in coroutine start")
        delay(1000)
        printTimeAndString("print in coroutine end")
    
    printTimeAndString("print in thread")
    Thread.sleep(2000)
    printTimeAndString("end")


fun printTimeAndString(text: String) 
    println("$Date(): $text")

打印结果如下所示:

Sat Dec 11 19:09:21 CST 2021: start
Sat Dec 11 19:09:21 CST 2021: print in thread
Sat Dec 11 19:09:21 CST 2021: print in coroutine start
Sat Dec 11 19:09:22 CST 2021: print in coroutine end
Sat Dec 11 19:09:23 CST 2021: end

代码的逻辑顺序为:

  1. 打印start
  2. 打印“print in thread”
  3. 创建一个协程
  4. 打印"print in coroutine start"
  5. 线程进入阻塞状态
  6. 协程delay结束,打印"print in coroutine end"
  7. 线程阻塞结束,打印end

从打印结果来看,线程中的print in thread优先于协程执行,协程中的delay 1s,并没有造成线程阻塞,协程和线程中的逻辑在并行执行。

修改printTimeAndString方法,加上打印当前线程信息,然后执行:

fun printTimeAndString(text: String) 
    println("$Date(): $text: $Thread.currentThread()")

打印结果:

Sat Dec 11 19:10:49 CST 2021: start: Thread[main,5,main]
Sat Dec 11 19:10:49 CST 2021: print in thread: Thread[main,5,main]
Sat Dec 11 19:10:49 CST 2021: print in coroutine start: Thread[DefaultDispatcher-worker-1,5,main]
Sat Dec 11 19:10:50 CST 2021: print in coroutine end: Thread[DefaultDispatcher-worker-1,5,main]
Sat Dec 11 19:10:51 CST 2021: end: Thread[main,5,main]

在这个打印结果中能够很容易发现,协程中的代码执行在另一个线程中,main方法协程以外的方法都是执行在main线程中。两者的线程不同,那么协程和线程的关系就是协程会创建新的线程来执行代码吗?

实际上并不是这样的,协程可以通过配置上下文元素,或者构造不同顶级协程来执行不同的协程,因为上述代码直接使用GlobalScope.launch没有指定任何上下文,它会默认使用调度器元素Dispatchers.Default,这个元素会指定协程在线程池中运行。

如果我们让协程指定在main线程中执行,需要替换GlobalScope.launchrunBlockingrunBlocking会获取当前线程,并根据当前线程去创建一个协程:

fun main() 
    printTimeAndString("start")
    runBlocking 
        printTimeAndString("print in coroutine start")
        delay(1000)
        printTimeAndString("print in coroutine end")
    
    printTimeAndString("print in thread")
    Thread.sleep(2000)
    printTimeAndString("end")

打印结果为:

Sat Dec 11 19:12:02 CST 2021: start: Thread[main,5,main]
Sat Dec 11 19:12:02 CST 2021: print in coroutine start: Thread[main,5,main]
Sat Dec 11 19:12:03 CST 2021: print in coroutine end: Thread[main,5,main]
Sat Dec 11 19:12:03 CST 2021: print in thread: Thread[main,5,main]
Sat Dec 11 19:12:05 CST 2021: end: Thread[main,5,main]

首先,打印顺序发生了变化,原来先打印的“print in thread”出现在"print in coroutine end"之后;其次,打印时间发生了变化,整个代码执行了3秒,而使用GlobalScope.launch用了2秒。这里可能看起来不明显,因为他们的时间长度很接近,读者们可以自己增加delay和sleep的时间自行检验;最后,他们都在同一个main线程上执行。

上面的变化说明,协程和线程在同一个线程上运行代码,协程会阻塞线程,再对比协程通过使用GlobalScope.launch,不难理解,协程运行在线程之上,和普通代码没什么区别。协程的特殊能力是,通过实现不同的协程,可以将协程指定在不同的线程上运行。这也就是为什么网上都说,协程是一个切换线程的框架。

协程的优势

线程是CPU调度的最小单位,所有应用程序的代码都运行于线程之上。操作系统层面上提供了进程、线程的实现。如果是在 android 平台上,我们会发现 Thread 的创建过程中,都会调用 Linux API 中的 pthread_create 函数,这直接说明了 Java 层中的 Thread 和 Linux 系统级别的中的线程是一一对应的。那么这就说明了,线程的创建与销毁,需要与操作系统进行交互,调度操作系统的资源。当需要大量创建和销毁时,系统资源就会造成浪费。所以线程,并不是很“轻量”。

协程与线程不同,协程本质上是代码、是对象。它需要在线程之上运行。所以,它很“轻量”。它不需要调用操作系统层面的方法和资源。所以,当你创建1000个协程对象,也只是内存上的问题。**代码在线程上执行,所以协程也需要在线程上运行。**除了每个协程都要新建一个线程来执行这种极端情况,本质上,线程与协程的轻重体现,就是创建一千个对象和一千个线程的区别。

协程的组成部分

在上一节中,我们使用GlobalScope.launch启动了一个协程,就以launch方法为例,来分析一下协程代码的组成:

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job

这是一个CoroutineScope的拓展方法,它有三个参数,一个返回值类型,加上方法本身,一共六个组成部分:

  • 协程作用域 CoroutineScope

  • 协程构建器 launch,协程作用域的拓展函数,协程的构建函数

  • 协程上下文 CoroutineContext,可以通过协程上下文来指定协程调度器

  • 协程启动项 CoroutineStart,定义协程构建的启动选项。 它用于启动、异步和其他协程构建器函数的启动参数

  • 挂起代码块 block:suspend CoroutineScope.() -> Unit,挂起闭包,在指定上下文中调用的协程代码

  • 协程任务 Job,可用于取消协程。

协程作用域 CoroutineScope

CoroutineScope是一个接口,而且内部只定义了一个CoroutineContext属性:

public interface CoroutineScope 
    public val coroutineContext: CoroutineContext

CoroutineScope的作用是声明:**实现CoroutineScope的类,都具有提供协程上下文的能力。**这个接口不需要我们手动去实现,协程库提供了封装好的委托实现。

总结一下,CoroutineScope称之为协程作用域,除了从名字上翻译过来的原因外,它也代表着,CoroutineScope的对象,能够给自身内部提供协程上下文,有了协程上下文,才能使用协程,所以,在CoroutineScope对象的范围内,是协程的作用范围。

CoroutineScope构造方法

   public fun CoroutineScope(context: CoroutineContext): CoroutineScope =
       ContextScope(if (context[Job] != null) context else context + Job()) // 如果context不包含Job元素,默认创建Job()

GlobalScope 全局作用域

GlobalScope实现了CoroutineScope接口,是全局的协程作用域,可用于创建顶级协程。它们在应用程序的整个生命周期中都会存活,不会过早被取消。创建顶级协程时,也可使用带有适当调度器的CoroutineScope对象来启动协程;特定情况下(例如必须在程序的整个生命周期内保持活跃状态的顶级后台进程中)可以通过@OptIn(DelicateCoroutinesApi::class)来安全合法的使用GlobalScope。其他情况可以将其改造成挂起函数,例如:

    fun loadConfiguration() 
        GlobalScope.launch 
            val config = fetchConfigFromServer() // network request
            updateConfiguration(config)
        
    

替换方案如下:

    suspend fun loadConfiguration() = coroutinesScope 
        launch  
            val config = fetchConfigFromServer() // network request
            updateConfiguration(config)
        
        

这里简单介绍一个coroutineScope方法:

    public suspend fun <R> coroutineScope(block: suspend CoroutineScope.() -> R): R 
        contract 
            callsInPlace(block, InvocationKind.EXACTLY_ONCE)
        
        return suspendCoroutineUninterceptedOrReturn  uCont ->
            val coroutine = ScopeCoroutine(uCont.context, uCont, true)
            coroutine.startUndispatchedOrReturn(coroutine, block)
        
    

coroutineScope方法内部会创建一个ScopeCoroutine,它也是CoroutineScope的实现类。这个方法在内部创建了一个CoroutineScope,并执行block。

MainScope

public fun MainScope(): CoroutineScope = ContextScope(SupervisorJob() + Dispatchers.Main)

上下文自带SupervisorJob和Dispatchers.Main元素。

主要用于UI组件:

    class Activity 
        private val mainScope = MainScope()

        fun destroy() 
            mainScope.cancel()
        
    

协程构建器(coroutine builder

概念:为新协程定义一个范围,协程构建器是CoroutineScope的拓展函数,并继承了CorourineScope的协程上下文来自动传递上下文元素和可取消性。

协程构建器函数有以下特点:

  1. CoroutineScope的拓展函数
  2. 会继承CoroutineScope的上下文
  3. 返回类型为Job

Scoping函数

withContext、coroutineScope等函数与launch不同,他们是挂起方法,一般都接收一个suspend闭包作为参数。在它们的方法内部会创建一个协程,并且可能会指定一些上下文元素(例如withContext),并将block透传,这类函数称为做Scoping函数。他们的结构一般是:

public suspend fun <R> coroutineScope(block: suspend CoroutineScope.() -> R): R
public suspend fun <T> withContext(context: CoroutineContext,block: suspend CoroutineScope.() -> T): T

桥接普通与挂起的方法

例如runBlocking等,这类函数与上面两种情况不同,他们可以将常规的代码切换到挂起代码。

public fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T 

runBlocking是个特例,它的作用是:创建一个新的协程并阻塞当前线程,直到协程结束。这个函数不应该在协程中使用,主要是为了main函数和测试设计的。

协程上下文

CoroutineContext是一个接口:

public interface CoroutineContext 
    public operator fun <E : Element> get(key: Key<E>): E? // 通过key获取元素

    public fun <R> fold(initial: R, operation: (R, Element) -> R): R

    public operator fun plus(context: CoroutineContext): CoroutineContext =
        if (context === EmptyCoroutineContext) this else
            context.fold(this)  acc, element ->
                val removed = acc.minusKey(element.key)
                if (removed === EmptyCoroutineContext) element else 
                    // 确保拦截器始终在上下文中最后(因此在出现时可以快速获取)
                    val interceptor = removed[ContinuationInterceptor]
                    if (interceptor == null) CombinedContext(removed, element) else 
                        val left = removed.minusKey(ContinuationInterceptor)
                        if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) else
                            CombinedContext(CombinedContext(left, element), interceptor)
                    
                
            

    public fun minusKey(key: Key<*>): CoroutineContext
    public interface Key<E : Element>
    public interface Element : CoroutineContext 
      	//...
    

CoroutineContext中定义了运算符重载方法get、plus:

  • get方法可以让我们通过context[key]的形式获取元素
  • plus方法可以让我们以Dispatchers.Default + SuperviorJob()的形式拼接上下文元素。

从get和plus方法的作用就可以看出,CoroutineContext实际上就是一组元素的集合

既然是一组元素的集合,那么它的数据结构是什么?

CoroutineContext的数据结构

这里主要是plus方法最后合并了一个CombinedContext

internal class CombinedContext(
    private val left: CoroutineContext,
    private val element: Element
) : CoroutineContext, Serializable 

    override fun <E : Element> get(key: Key<E>): E? 
        var cur = this
        while (true) 
            cur.element[key]?.let  return it 
            val next = cur.left
            if (next is CombinedContext) 
                cur = next
             else 
                return next[key]
            
        
    
    // ...

CombinedContext有两个属性leftelement,很明显这是一个链表的节点。所以CoroutineContext内部的数据结构是链表

协程启动项 CoroutineStart

CoroutineStart是一个枚举类:

public enum class CoroutineStart 
    DEFAULT,
    LAZY,
    ATOMIC,
    UNDISPATCHED;

    @InternalCoroutinesApi
    public val isLazy: Boolean get() = this === LAZY

在协程构建器中,用于launch、async和其他协程构造函数的启动参数。

  • DEFAULT:根据上下文立即安排协程执行。
  • LAZY:在需要的时候才启动协程,如果协程 Job 在它开始执行之前被取消,那么它根本不会开始执行,而是会以异常结束。
  • ATOMIC:原子地(以不可取消的方式)根据上下文安排协程执行,这与 DEFAULT 类似,但协程在开始执行之前无法取消。
  • UNDISPATCHED:立即执行协程,直到它在当前线程中的第一个暂停点,类似于使用 Dispatchers.Unconfined 启动的协程。但是,当协程从暂停状态恢复时,它会根据其上下文中的 CoroutineDispatcher 进行调度。

suspend () -> Unit

suspend T.() -> Unit

在Kotlin 1.6版本中,suspend () -> Unit可以作为父类型:

// 定义
class MyClickAction : suspend () -> Unit 
    override suspend fun invoke()  TODO() 

fun launchOnClick(action: suspend () -> Unit) 
// 调用
launchOnClick(MyClickAction())

需要注意的是,suspend () -> Unit作为父类型有两个限制:

  • 不能混合使用挂起block和非挂起block作为父类。

    // Compiler Error: Mixing suspend and non-suspend supertypes is not allowed
    class MyClickAction : suspend () -> Unit, String  
    复制代码
    
  • 不能继承多个挂起类型。

    因为,每个挂起都有一个invoke函数,继承多个可挂起类型时,invoke函数会混乱。

另外,普通的() -> Unit也可以传给fun getSuspending(suspending: suspend () -> Unit) ,Kotlin 1.6.0 引入了从常规Block到挂起Block类型的转换。编译器会自动将普通的block自动转换为suspend的block。

还有一种形式:

suspend 
	// ...

与上面的suspend关键字不同,这里的suspend是一个函数:

public inline fun <R> suspend(noinline block: suspend () -> R): suspend () -> R = block

suspend方法的用处是,通常在普通的函数或lambda中,不能直接使用suspend关键字,且lambda没有参数的情况下,可以使用这个方法创建一个挂起函数。

协程任务 Job

继承自CoroutineContext.Element,是一种上下文元素。Job代表了当前协程任务的对象,赋予了协程结构化并发、生命周期和可取消的能力。

A background job. Conceptually, a job is a cancellable thing with a life-cycle that culminates in its completion.

一个后台工作。 从概念上讲,工作是一个可取消的东西,其生命周期以完成为顶点。

public interface Job : CoroutineContext.Element 
    public val isActive: Boolean
    public val isCompleted: Boolean
    public val isCancelled: Boolean
    public fun start(): Boolean
    public fun cancel(cause: CancellationException? = null)
    public suspend fun join()

Job代表的含义是,封装了协程中需要执行的代码逻辑。Thread类可以代表线程在Java中的对象引用,Job则是代表了协程对象引用。

Job的状态

与Thread相同,Job也有几个状态来表示协程的状态:

StateisActive(是否活跃)isCompleted(是否完成)isCancelled(是否取消)
New (可选初始状态)falsefalsefalse
Active (默认初始状态)truefalsefalse
Completing (短暂态)truefalsefalse
Cancelling (短暂态)falsefalsetrue
Cancelled (完成态)falsetruetrue
Completed (完成态)falsetruefalse
  • New

    启动选项CoroutineStart设置为CoroutineStart.LAZY的情况下,Job的状态是New(创建但未启动),可以通过调用start或join方法来启动Job。

  • Active

    通常情况下的Job是Active状态的(创建且已启动),然而,当launch的启动选项CoroutineStart设置为CoroutineStart.LAZY的情况下,Job的状态是New(创建但未启动),可以通过调用start或join方法来启动Job。

    当协程正在运行时、直到完成或协程失败或取消前,Job的状态都是是Active。

  • Cancelling

    抛出异常的Job会导致其进入Cancelling状态,也可以使用cancel方法来随时取消Job使其立即转换为Cancelling状态。

    直到Job等待其所有子项都取消前,一直是Cancelling状态。

  • Cancelled

    当它递归取消子项,并等待所有的子项都取消后,该Job会进入Cancelled状态。

  • Completing

    与Cancelling相同,在等待所有的子项完成时,会进入这个状态。需要注意的是这个完成中的状态是对于内部的子项的,对于外部观察者看来,父Job仍是Active状态。

  • Completed

    当所有子项都完成,该Job会变为Completed状态。

Job的状态流转:

                                          wait children
    +-----+ start  +--------+ complete   +-------------+  finish  +-----------+
    | New | -----> | Active | ---------> | Completing  | -------> | Completed |
    +-----+        +--------+            +-------------+          +-----------+
                     |  cancel / fail       |
                     |     +----------------+
                     |     |
                     V     V
                 +------------+                           finish  +-----------+
                 | Cancelling | --------------------------------> | Cancelled |
                 +------------+                                   +-----------+

关于取消,有这样一行备注:

其中取消父项会导致立即递归取消其所有子项。 具有除 CancellationException 之外的异常的子项失败会立即取消其父级,从而取消其所有其他子项。 可以使用 SupervisorJob 自定义此行为。

SupervisorJob

SupervisorJob是Job的实现类。与Job的区别是,它的子项可以单独失败,不会导致父协程和其他子协程的失败。

所以SupervisorJob可以实现一个自定义规则的失败子项处理机制:

  • 使用 [launch][CoroutineScope.launch] 创建的子Job失败可以通过上下文中的 CoroutineExceptionHandler 处理。
  • 使用 [async][CoroutineScope.async] 创建的子Job的失败可以通过 Deferred.await 对生成的延迟值进行处理。

如果参数parent指定了值,这个supervisor job就变成了parent的子项,并且当parent失败或取消时被取消。所有的supervisor的子项也会被取消。调用cancel方法若有异常(除了CancellationException),也会导致parent取消。

取消机制是一个比较复杂的专题,单独分享。

获取Job对象

获取Job对象的方式有两种:

  1. 通过协程构建器方法的返回值获取Job对象。
  2. 通过构造方法直接创建,这种方式通常用于协程上下文中。
val job = Job()

// 构造方法
@Suppress("FunctionName")
public fun Job(parent: Job? = null): CompletableJob = JobImpl(parent)

从概念上讲,直接 val job = Job()与使用launch创建一个空代码块相同,即等同于:

val job = launch 
		// no opt

协程拦截器 ContinuationInterceptor

ContinuationInterceptor用来在协程真正运行前,执行拦截操作,拦截器模式中的定义接口。

协程调度器

在分析线程与协程的区别中,我们提到过一个词 – “协程调度器”,当我们给协程配置了调度器后,协程所在的线程发生了变化。很明显协程调度器中隐藏着线程切换的秘密。

协程调度器也是一种协程上下文元素:

public abstract class CoroutineDispatcher : AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor 
  	// ...
  	public abstract fun dispatch(context: CoroutineContext, block: Runnable)

CoroutineDispatcher中定义了调度的关键方法dispatch。CoroutineDispatcher最常见的实现是一个单例 Dispatchers :

public actual object Dispatchers 
    @JvmStatic
    public actual val Default: CoroutineDispatcher = createDefaultDispatcher()

    @JvmStatic
    public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher

    @JvmStatic
    public actual val Unconfined: CoroutineDispatcher = kotlinx.coroutines.Unconfined

    @JvmStatic
    public val IO: CoroutineDispatcher = DefaultScheduler.IO

Dispatchers的作用是对调度器进行分组。实际上,CoroutineDispatcher的实现类有很多,例如EventLoop、CommonPool等,而这些,都会在不同的组中使用到。

Default

如果没有在上下文中指定Dispatcher或任何其他的 ContinuationInterceptor,则所有标准构建器(如launch、async等),默认使用CoroutineDispatcher。

它由 JVM 上的共享线程池提供支持。 默认情况下,此调度程序使用的最大并行级别等于 CPU 内核数,但至少为 2。 并行度 X 保证在这个调度器中并行执行的任务不超过 X 个。

    internal fun createDefaultDispatcher(): CoroutineDispatcher =
    		if (useCoroutinesScheduler) DefaultScheduler else CommonPool

useCoroutinesScheduler的实现:

internal val useCoroutinesScheduler = systemProp(COROUTINES_SCHEDULER_PROPERTY_NAME).let  value ->
    when (value) 
        null, "", "on" -> true
        "off" -> false
        else -> error("System property '$COROUTINES_SCHEDULER_PROPERTY_NAME' has unrecognized value '$value'")
    

Default会根据useCoroutinesScheduler属性(默认为true) 去获取对应的线程池:

  • DefaultScheduler :Kotlin内部自己实现的线程池逻辑
  • CommonPool:Java类库中的Executor实现的线程池逻辑

DefaultScheduler

internal object DefaultScheduler : ExperimentalCoroutineDispatcher() 
    val IO: CoroutineDispatcher = LimitingDispatcher(
        this,
        systemProp(IO_PARALLELISM_PROPERTY_NAME, 64.coerceAtLeast(AVAILABLE_PROCESSORS)),
        "Dispatchers.IO",
        TASK_PROBABLY_BLOCKING
    )
  	// ...

DefaultScheduler中定义了一个IO属性,他就是 Dispatchers.IO的实现。

另外DefaultScheduler的dispatch方法在父类ExperimentalCoroutineDispatcher中:

@InternalCoroutinesApi
public open class ExperimentalCoroutineDispatcher(
    private val corePoolSize: Int,
    private val maxPoolSize: Int,
    private val idleWorkerKeepAliveNs: Long,
    private val schedulerName: String = "CoroutineScheduler"
) : ExecutorCoroutineDispatcher() 
		// ... 
    override val executor: Executor
        get() = coroutineScheduler

    private var coroutineScheduler = createScheduler()

    override fun dispatch(context: CoroutineContext, block: Runnable): Unit =
        try 
            coroutineScheduler.dispatch(block)
         catch (e: RejectedExecutionException) 
            DefaultExecutor.dispatch(context, block)
        

    public fun blocking(parallelism: Int = BLOCKING_DEFAULT_PARALLELISM): CoroutineDispatcher 
        require(parallelism > 0)  "Expected positive parallelism level, but have $parallelism" 
        return LimitingDispatcher(this, parallelism, null, TASK_PROBABLY_BLOCKING)
    

  	public fun limited(parallelism: Int): CoroutineDispatcher 
        require(parallelism > 0)  "Expected positive parallelism level, but have $parallelism" 
        require(parallelism <= corePoolSize)  "Expected parallelism level lesser than core pool size ($corePoolSize), but have $parallelism" 
        return LimitingDispatcher(this, parallelism, null, TASK_NON_BLOCKING)
    

    private fun createScheduler() = CoroutineScheduler(corePoolSize, maxPoolSize, idleWorkerKeepAliveNs, schedulerName)

分发逻辑在CoroutineScheduler中:

internal class CoroutineScheduler(
    @JvmField val corePoolSize: Int,
    @JvmField val maxPoolSize: Int,
    @JvmField val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
    @JvmField val schedulerName: String = DEFAULT_SCHEDULER_NAME
) : Executor, Closeable 
    fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) 
        val task = createTask(block, taskContext) 
        // 尝试将任务提交到本地队列并根据结果采取行动
        val currentWorker = currentWorker()
        val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch)
        if (notAdded != null) 
            if (!addToGlobalQueue(notAdded)) 
                // 全局队列在关闭/关闭的最后一步关闭——不应接受更多任务
                throw RejectedExecutionException("$schedulerName was terminated")
            
        
        val skipUnpark = tailDispatch && currentWorker != null
        if (task.mode == TASK_NON_BLOCKING) 
            if (skipUnpark) return
            signalCpuWork()
         else 
            signalBlockingWork(skipUnpark = skipUnpark)
        
    
  	// ... 

大致逻辑是,首先将block转换成了一个Task(实际是一个runnable),然后提交到本地队列,等待执行。

CommonPool

internal object CommonPool : ExecutorCoroutineDispatcher() 
    override fun dispatch(context: CoroutineContext, block: Runnable) 
        try 
            (pool ?: getOrCreatePoolSync()).execute(wrapTask(block))
         catch (e: RejectedExecutionException) 
            unTrackTask()
            // CommonPool only rejects execution when it is being closed and this behavior is reserved
            // for testing purposes, so we don't have to worry about cancelling the affected Job here.
            DefaultExecutor.enqueue(block)
        
    
		// ...    

CommonPool的实现是java类库中的Executor实现线程池逻辑。

IO

public val IO: CoroutineDispatcher = DefaultScheduler.IO

IO组直接使用的DefaultScheduler的IO对象,是一个LimitingDispatcher对象。

其分发实现为:

private class LimitingDispatcher(
    private val dispatcher: ExperimentalCoroutineDispatcher,
    private val parallelism: Int,
    private val name: String?,
    override val taskMode: Int
) : ExecutorCoroutineDispatcher(), TaskContext, Executor 

    private val queue = ConcurrentLinkedQueue<Runnable>()

    override fun dispatch(context: CoroutineContext, block: Runnable) = dispatch(block, false)

    private fun dispatch(block: Runnable, tailDispatch: Boolean) 
        var taskToSchedule = block
        while (true) 
            // 提交进行中的任务
            val inFlight = inFlightTasks.incrementAndGet()
            // 快速路径,如果没有达到并行度限制,则分发任务并返回
            if (inFlight <= parallelism) 
                dispatcher.dispatchWithContext(taskToSchedule, this, tailDispatch)
                return
            
            // 达到并行度限制,将任务添加到队列中
            queue.add(taskToSchedule)
            if (inFlightTasks.decrementAndGet() >= parallelism) 
                return
            

            taskToSchedule = queue.poll() ?: return
        
    
		// ... 

可以看出 Dispatchers.Default和IO 是在同一个线程中运行的,也就是共用相同的线程池。但是IO多了线程数量限制。在CoroutineScheduler中:

  1. CoroutineScheduler最多有corePoolSize个线程被创建
  2. corePoolSize它的取值为max(2, CPU核心数)

而在LimitingDispatcher中:

1、创建线程数不能大于maxPoolSize ,公式:max(corePoolSize, min(CPU核心数 * 128, 2^21 - 2))。

Main

需要引入kotlinx-coroutines-android库,它里面有Android对应的Dispatchers.Main实现。

    public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher

    @JvmField
    val dispatcher: MainCoroutineDispatcher = loadMainDispatcher()

    private fun loadMainDispatcher(): MainCoroutineDispatcher 
        return try 
            val factories = if (FAST_SERVICE_LOADER_ENABLED) 
                FastServiceLoader.loadMainDispatcherFactory()
             else 
                // 在Android上编译时,启动R8优化
                ServiceLoader.load(
                        MainDispatcherFactory::class.java,
                        MainDispatcherFactory::class.java.classLoader
                ).iterator().asSequence().toList()
            
            @Suppress("ConstantConditionIf")
            factories.maxByOrNull  it.loadPriority ?.tryCreateDispatcher(factories)
                ?: createMissingDispatcher()
         catch (e: Throwable) 
            // Service loader can throw an exception as well
            createMissingDispatcher(e)
        
    

通过FAST_SERVICE_LOADER_ENABLED字段判断实现逻辑,FAST_SERVICE_LOADER_ENABLED默认为true,FastServiceLoader.loadMainDispatcherFactory的实现为:

    internal fun loadMainDispatcherFactory(): List<MainDispatcherFactory> 
        val clz = MainDispatcherFactory::class.java
        if (!ANDROID_DETECTED) 
            return load(clz, clz.classLoader)
        

        return try 
            val result = ArrayList<MainDispatcherFactory>(2)
            createInstanceOf(clz, "kotlinx.coroutines.android.AndroidDispatcherFactory")?.apply  result.add(this) 
            createInstanceOf(clz, "kotlinx.coroutines.test.internal.TestMainDispatcherFactory")?.apply  result.add(this) 
            result
         catch (e: Throwable) 
            // Fallback to the regular SL in case of any unexpected exception
            load(clz, clz.classLoader)
        
    

    internal val ANDROID_DETECTED = runCatching  Class.forName("android.os.Build") .isSuccess

关于这个方法,官方描述:

此方法尝试以 Android 友好的方式加载 MainDispatcherFactory。 如果我们不是在 Android 上,则此方法回退到常规服务加载,否则我们尝试为 AndroidDispatcherFactory 和 TestMainDispatcherFactory 执行 Class.forName 查找。

所以,Dispatchers.Main是和Android平台强相关的,底层原理大概是:通过调用Looper.getMainLooper()获取handler ,最终通过handler来实现在主线程中运行。 实质就是把任务通过Handler运行在Android的主线程。

internal class AndroidDispatcherFactory : MainDispatcherFactory 

    override fun createDispatcher(allFactories: List<MainDispatcherFactory>) =
        HandlerContext(Looper.getMainLooper().asHandler(async = true))

    override fun hintOnError(): String? = "For tests Dispatchers.setMain from kotlinx-coroutines-test module can be used"

    override val loadPriority: Int
        get() = Int.MAX_VALUE / 2

Unconfined

Unconfined的含义是,任务执行在默认的启动线程。之后由调用resume的线程决定恢复协程的线程:

public actual val Unconfined: CoroutineDispatcher = kotlinx.coroutines.Unconfined

internal object Unconfined : CoroutineDispatcher() 
    override fun isDispatchNeeded(context: CoroutineContext): Boolean = false

    override fun dispatch(context: CoroutineContext, block: Runnable) 
        // It can only be called by the "yield" function. See also code of "yield" function.
        val yieldContext = context[YieldContext]
        if (yieldContext != null) 
            // report to "yield" that it is an unconfined dispatcher and don't call "block.run()"
            yieldContext.dispatcherWasUnconfined = true
            return
        
        throw UnsupportedOperationException("Dispatchers.Unconfined.dispatch function can only be used by the yield function. " +
            "If you wrap Unconfined dispatcher in your code, make sure you properly delegate " +
            "isDispatchNeeded and dispatch calls.")
    

这个方法的逻辑,与DispatchedContinuation的有关。

DispatchedContinuation是什么,首先,协程的继承关系顶层有一个Continuation,它可以表示一个协程,而且,Continuation中定义了协程可恢复的能力,rsumeWith方法。而DispatchedContinuationContinuation的一个实现,它的resumeWith

    override fun resumeWith(result: Result<T>) 
        val context = continuation.context
        val state = result.toState()
        if (dispatcher.isDispatchNeeded(context)) 
            _state = state
            resumeMode = MODE_ATOMIC_DEFAULT
            dispatcher.dispatch(context, this)
         else 
            executeUnconfined(state, MODE_ATOMIC_DEFAULT) 
                withCoroutineContext(this.context, countOrElement) 
                    continuation.resumeWith(result)
                
            
        
    

通过isDispatchNeeded(是否需要dispatch,Unconfined=false,default,IO=true)判断做不同处理,Unconfined是执行executeUnconfined方法:

private inline fun DispatchedContinuation<*>.executeUnconfined(
    contState: Any?, mode: Int, doYield: Boolean = false,
    block: () -> Unit
): Boolean 
    val eventLoop = ThreadLocalEventLoop.eventLoop
    // If we are yielding and unconfined queue is empty, we can bail out as part of fast path
    if (doYield && eventLoop.isUnconfinedQueueEmpty) return false
    return if (eventLoop.isUnconfinedLoopActive) 
        // When unconfined loop is active -- dispatch continuation for execution to avoid stack overflow
        _state = contState
        resumeMode = mode
        eventLoop.dispatchUnconfined(this)
        true // queued into the active loop
     else 
        // Was not active -- run event loop until all unconfined tasks are executed
        runUnconfinedEventLoop(eventLoop, block = block)
        false
    

这个方法会从Threadlocal中取出eventLoop(eventLoop是和当前线程相关的),判断是否在执行Unconfined任务

  1. 如果在执行则调用EventLoop的dispatchUnconfined方法把Unconfined任务放进EventLoop中。

  2. 如果没有在执行则直接执行。

    internal inline fun DispatchedTask<*>.runUnconfinedEventLoop(
        eventLoop: EventLoop,
        block: () -> Unit
    ) 
        eventLoop.incrementUseCount(unconfined = true)
        try 
            block()
            while (true) 
                if (!eventLoop.processUnconfinedEvent()) break
            
         catch (e: Throwable) 
            handleFatalException(e, null)
         finally 
            eventLoop.decrementUseCount(unconfined = true)
        
    
    

EventLoop是存放在Threadlocal中,所以是跟当前线程相关联的,而EventLoop也是CoroutineDispatcher的一个子类。

internal abstract class EventLoop : CoroutineDispatcher() 
		private var unconfinedQueue: ArrayQueue<DispatchedTask<*>>? = null

		public fun dispatchUnconfined(task: DispatchedTask<*>) 
        val queue = unconfinedQueue ?:
            ArrayQueue<DispatchedTask<*>>().also  unconfinedQueue = it 
        queue.addLast(task)
    

    public fun processUnconfinedEvent(): Boolean 
        val queue = unconfinedQueue ?: return false
        val task = queue.removeFirstOrNull() ?: return false
        task.run()
        return true
    

内部通过一个双向队列实现存放Unconfined任务:

  1. EventLoopdispatchUnconfined方法用于把Unconfined任务放进队列的尾部
  2. processUnconfinedEvent方法用于从队列的头部移出Unconfined任务执行

In the end

到此协程的基本组成介绍的差不多了,可能大家会觉得还有一些概念,比如挂起是什么等等没讲,这些问题会在后续的文章中,给大家一一讲清楚。

另外,关于协程的使用,可以参考官方教程:[[play.kotlinlang.org/hands-on/In…](https://link.juejin.cn?target=https%3A%2F%2Fplay.kotlinlang.org%2Fhands-on%2FIntroduction%2520to%2520Coroutines%2520and%2520Channels%2F01_Introduction%255D( “https://play.kotlinlang.org/hands-on/Introduction%20to%20Coroutines%20and%20Channels/01_Introduction%5D(”)play.kotlinlang.org/hands-on/In… to Coroutines and Channels/01_Introduction)

本文内容较多,如有错误和建议敬请指正。

作者:自动化BUG制造器
链接:https://juejin.cn/post/7040462292944683016

最后

如果想要成为架构师或想突破20~30K薪资范畴,那就不要局限在编码,业务,要会选型、扩展,提升编程思维。此外,良好的职业规划也很重要,学习的习惯很重要,但是最重要的还是要能持之以恒,任何不能坚持落实的计划都是空谈。

如果你没有方向,这里给大家分享一套由阿里高级架构师编写的《Android八大模块进阶笔记》,帮大家将杂乱、零散、碎片化的知识进行体系化的整理,让大家系统而高效地掌握Android开发的各个知识点。

相对于我们平时看的碎片化内容,这份笔记的知识点更系统化,更容易理解和记忆,是严格按照知识体系编排的。

一、架构师筑基必备技能

1、深入理解Java泛型
2、注解深入浅出
3、并发编程
4、数据传输与序列化
5、Java虚拟机原理
6、高效IO

二、Android百大框架源码解析

1.Retrofit 2.0源码解析
2.Okhttp3源码解析
3.ButterKnife源码解析
4.MPAndroidChart 源码解析
5.Glide源码解析
6.Leakcanary 源码解析
7.Universal-lmage-Loader源码解析
8.EventBus 3.0源码解析
9.zxing源码分析
10.Picasso源码解析
11.LottieAndroid使用详解及源码解析
12.Fresco 源码分析——图片加载流程

三、Android性能优化实战解析

  • 腾讯Bugly:对字符串匹配算法的一点理解
  • 爱奇艺:安卓APP崩溃捕获方案——xCrash
  • 字节跳动:深入理解Gradle框架之一:Plugin, Extension, buildSrc
  • 百度APP技术:Android H5首屏优化实践
  • 支付宝客户端架构解析:Android 客户端启动速度优化之「垃圾回收」
  • 携程:从智行 Android 项目看组件化架构实践
  • 网易新闻构建优化:如何让你的构建速度“势如闪电”?

四、高级kotlin强化实战

1、Kotlin入门教程
2、Kotlin 实战避坑指南
3、项目实战《Kotlin Jetpack 实战》

  • 从一个膜拜大神的 Demo 开始

  • Kotlin 写 Gradle 脚本是一种什么体验?

  • Kotlin 编程的三重境界

  • Kotlin 高阶函数

  • Kotlin 泛型

  • Kotlin 扩展

  • Kotlin 委托

  • 协程“不为人知”的调试技巧

  • 图解协程:suspend

五、Android高级UI开源框架进阶解密

1.SmartRefreshLayout的使用
2.Android之PullToRefresh控件源码解析
3.Android-PullToRefresh下拉刷新库基本用法
4.LoadSir-高效易用的加载反馈页管理框架
5.Android通用LoadingView加载框架详解
6.MPAndroidChart实现LineChart(折线图)
7.hellocharts-android使用指南
8.SmartTable使用指南
9.开源项目android-uitableview介绍
10.ExcelPanel 使用指南
11.Android开源项目SlidingMenu深切解析
12.MaterialDrawer使用指南

六、NDK模块开发

1、NDK 模块开发
2、JNI 模块
3、Native 开发工具
4、Linux 编程
5、底层图片处理
6、音视频开发
7、机器学习

七、Flutter技术进阶

1、Flutter跨平台开发概述
2、Windows中Flutter开发环境搭建
3、编写你的第一个Flutter APP
4、Flutter开发环境搭建和调试
5、Dart语法篇之基础语法(一)
6、Dart语法篇之集合的使用与源码解析(二)
7、Dart语法篇之集合操作符函数与源码分析(三)

八、微信小程序开发

1、小程序概述及入门
2、小程序UI开发
3、API操作
4、购物商场项目实战……

全套视频资料:

一、面试合集

二、源码解析合集


三、开源框架合集


欢迎大家一键三连支持,若需要文中资料,直接点击文末CSDN官方认证微信卡片免费领取↓↓↓

以上是关于Kotlin协程基础概念深入理解的主要内容,如果未能解决你的问题,请参考以下文章

深入kotlin- 协程

深入理解Koltin协程:为什么学习 Kotlin 协程?

深入理解Kotlin协程协程的创建启动挂起函数理论篇

深入理解Kotlin协程协程调度器Dispatchers源码追踪扒皮

深入理解Kotlin协程协程的上下文 CoroutineContext

深入理解Kotlin协程CoroutineScope.launch源码追踪扒皮