Kotlin Coroutine 源码解析 —— 协程是如何运行的

Posted 薛瑄

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kotlin Coroutine 源码解析 —— 协程是如何运行的相关的知识,希望对你有一定的参考价值。

前言

在刚接触协程的时候,就产生了一个疑问,协程到底有没有创建线程,如果有,那它和线程池有什么区别?如果没有,为什么在android的主线程中不会 阻塞,了解Android源码的 应该猜到了,肯定是创建了子线程,不然 就会报异常了。那协程又是如何保证任务能切换到主线程呢? 下面就带着这几个问题,来分析协程的源码

示例

用下面的示例 模拟获取token,请求网络的过程。顺序执行,不会阻塞当前线程,

fun main() 

  GlobalScope.launch 
    println("开始 ")
    val token = requestToken()
    val post = createPost(token, "item")
    val postResult = processPost(post)
    println("结束 ")
  
  Thread.sleep(60*60*1000L) // 阻塞主线程来保证 JVM 存活



suspend fun requestToken():String
  println("获取token")
  return  "token"


suspend fun createPost(token:String,item:String):String
  println("创建postItem")
  return  "$token + $item"


suspend fun processPost(post:String):String
  println("执行post请求")
  return  "$post + result"

来看一下,kotlin转换成java的代码

public final class TestKt 
     public static final void main() 
      BuildersKt.launch$default((CoroutineScope)GlobalScope.INSTANCE, (CoroutineContext)null, (CoroutineStart)null, (Function2)(new Function2((Continuation)null) 
         private CoroutineScope p$;
         Object L$0;
         Object L$1;
         Object L$2;
         int label;

         @Nullable
         public final Object invokeSuspend(@NotNull Object $result) 
            Object var10000;
            label26: 
               CoroutineScope $this$launch;
               String token;
               String post;
               Object var8;
               label25: 
                  var8 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
                  switch(this.label) 
                  case 0:
                     ResultKt.throwOnFailure($result);
                     $this$launch = this.p$;
                     token = "开始 ";
                     boolean var9 = false;
                     System.out.println(token);
                     this.L$0 = $this$launch;
                     this.label = 1;
                     var10000 = TestKt.requestToken(this);
                     if (var10000 == var8) 
                        return var8;
                     
                     break;
                  case 1:
                     $this$launch = (CoroutineScope)this.L$0;
                     ResultKt.throwOnFailure($result);
                     var10000 = $result;
                     break;
                  case 2:
                     token = (String)this.L$1;
                     $this$launch = (CoroutineScope)this.L$0;
                     ResultKt.throwOnFailure($result);
                     var10000 = $result;
                     break label25;
                  case 3:
                     post = (String)this.L$2;
                     token = (String)this.L$1;
                     $this$launch = (CoroutineScope)this.L$0;
                     ResultKt.throwOnFailure($result);
                     var10000 = $result;
                     break label26;
                  default:
                     throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
                  

                  token = (String)var10000;
                  this.L$0 = $this$launch;
                  this.L$1 = token;
                  this.label = 2;
                  var10000 = TestKt.createPost(token, "item", this);
                  if (var10000 == var8) 
                     return var8;
                  
               

               post = (String)var10000;
               this.L$0 = $this$launch;
               this.L$1 = token;
               this.L$2 = post;
               this.label = 3;
               var10000 = TestKt.processPost(post, this);
               if (var10000 == var8) 
                  return var8;
               
            

            String postResult = (String)var10000;
            String var6 = "结束 ";
            boolean var7 = false;
            System.out.println(var6);
            return Unit.INSTANCE;
         

         @NotNull
         public final Continuation create(@Nullable Object value, @NotNull Continuation completion) 
            Intrinsics.checkParameterIsNotNull(completion, "completion");
            Function2 var3 = new <anonymous constructor>(completion);
            var3.p$ = (CoroutineScope)value;
            return var3;
         

         public final Object invoke(Object var1, Object var2) 
            return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
         
      ), 3, (Object)null);
      Thread.sleep(3600000L);
   

   // $FF: synthetic method
   public static void main(String[] var0) 
      main();
   

   @Nullable
   public static final Object requestToken(@NotNull Continuation $completion) 
      String var1 = "获取token";
      boolean var2 = false;
      System.out.println(var1);
      return "token";
   

   @Nullable
   public static final Object createPost(@NotNull String token, @NotNull String item, @NotNull Continuation $completion) 
      String var3 = "创建postItem";
      boolean var4 = false;
      System.out.println(var3);
      return token + " + " + item;
   

   @Nullable
   public static final Object processPost(@NotNull String post, @NotNull Continuation $completion) 
      String var2 = "执行post请求";
      boolean var3 = false;
      System.out.println(var2);
      return post + " + result";
   

从java代码,你应该已经看出一些端倪了,最终肯定是调用了invokeSuspend,来顺序执行了业务中的三个函数。没错,需要等待前一个函数执行完,才能执行后一个函数。以一种同步的方式,实现了异步编程。那么接下来的重点就是分析,最终如何调用到这个invokeSuspend函数。

从Kotlin 生成的字节码,可以看出,就是launch中的lambda 参数,最终会转换为 BuildersKt.launch$default 的第四个参数,它的类型是一个匿名内部类

final class com/icisoo/xw/TestKt$main$1 extends kotlin/coroutines/jvm/internal/SuspendLambda implements kotlin/jvm/functions/Function2

可以看到它继承了SuspendLambda ->继承 ContinuationImpl ->继承 BaseContinuationImpl,最终在 resumeWith() 中会调用invokeSuspend()

源码分析

1、launch

从示例的kotlin 代码 开始分析,首先是launch 函数

代码一:
public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job 
    //把当前context 加入到已有的CoroutineContext,并返回
    val newContext = newCoroutineContext(context)
    //创建一个协程,它们是继承了AbstractCoroutine
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    //执行协程    
    coroutine.start(start, coroutine, block)
    return coroutine

  • CoroutineContext 协程上下文
  1. 包含当前协程scope的信息, 例如Job, ContinuationInterceptor, CoroutineName 和CoroutineId。
  2. 如果是嵌套的launch,那么里层的launch会使用外层launch的CoroutineContext
  3. CoroutineContext中,是用map来存这些信息的, map的键是这些类的伴生对象,值是这些类的一个实例。 你可以看到这样的代码,来获取context的信息:
val job = context[Job]
val continuationInterceptor = context[ContinuationInterceptor]
  • CoroutineScope 协程的作用域,调用myScope.cancel()可以停止该作用域里的所有任务(Activity界面destroy(),子任务没必要继续执行了),下面介绍几种创建作用域的方式:
  1. 使用lauch, async 等builder创建一个新的子协程。协程(AbstractCoroutine)继承了 CoroutineScope,从父scope中继承了协程上下文 以及Job
  2. 使用coroutineScope Api创建新scope:
    public suspend fun <R> coroutineScope(block: suspend CoroutineScope.() -> R): R
    这个api主要用于方便地创建一个子域,并且管理域中的所有子协程。注意这个方法只有在所有 block中创建的子协程全部执行完毕后,才会退出。如下示例:
      // print输出的结果顺序将会是 1, 2, 3, 4
      coroutineScope 
          delay(1000)
          println("1")
          launch  
              delay(6000) 
              println("3")
          
          println("2")
          return@coroutineScope
      
      println("4")

  1. 继承CoroutineScope. 或直接使用CoroutineScope创建对象,用于处理具有生命周期的对象。

下面看一下Android中Viewmodel的Scope 源码

val ViewModel.viewModelScope: CoroutineScope
        get() 
            val scope: CoroutineScope? = this.getTag(JOB_KEY)
            if (scope != null) 
                return scope
            
            return setTagIfAbsent(JOB_KEY,
                CloseableCoroutineScope(SupervisorJob() + Dispatchers.Main.immediate))
        

internal class CloseableCoroutineScope(context: CoroutineContext) : Closeable, CoroutineScope 
    override val coroutineContext: CoroutineContext = context

    override fun close() 
        coroutineContext.cancel()
    

或者使用CoroutineScope创建对象

class Activity 
    private val mainScope = CoroutineScope(Dispatchers.Default) // use Default for test purposes
    
    fun destroy() 
        mainScope.cancel()
    

    fun doSomething() 
        // 在示例中启动了 10 个协程,且每个都工作了不同的时长
        repeat(10)  i ->
            mainScope.launch 
                delay((i + 1) * 200L) // 延迟 200 毫秒、400 毫秒、600 毫秒等等不同的时间
                println("Coroutine $i is done")
            
        
    
 

2、newCoroutineContext

返回到 代码一,分析CoroutineContext的是如何被添加进去,并返回一个CoroutineContext对象的

代码二
public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext 
    // 这个 + 号,在CoroutineContext中被重载了,以key value的形式保存
    val combined = coroutineContext + context
    val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined
    return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)
        //这里将一个协程分发器,加入到了CoroutineContext中。
        debug + Dispatchers.Default else debug

Dispatchers.Default 创建的对象是 DefaultScheduler,它继承 ExperimentalCoroutineDispatcher ->继承 ExecutorCoroutineDispatcher ->继承 CoroutineDispatcher

CoroutineDispatcher 顾名思义,它对协程的执行进行分发

3、StandaloneCoroutine

返回到 代码一,分析StandaloneCoroutine对象,

代码三
private open class StandaloneCoroutine(
    parentContext: CoroutineContext,
    active: Boolean
) : AbstractCoroutine<Unit>(parentContext, active) 
    override fun handleJobException(exception: Throwable): Boolean 
        handleCoroutineException(context, exception)
        return true
    

StandaloneCoroutine 继承了AbstractCoroutine,后者是一个重点,它实现了Job, Continuation<T>, CoroutineScope这几个接口。说明它是一个协程,也可以是一个协程作用域,也可以作为一个job,成为CoroutineContext的一项

代码四
public abstract class AbstractCoroutine<in T>(
    /**
     * The context of the parent coroutine.
     */
    @JvmField
    protected val parentContext: CoroutineContext,
    active: Boolean = true
) : JobSupport(active), Job, Continuation<T>, CoroutineScope 

4、start

返回到 代码一,分析coroutine.start(start, coroutine, block) 执行到AbstractCoroutine 中的start

代码五
    //R 类型是CoroutineScope  T 类型是Unit
    //参数receiver 是当前对象,也就是StandaloneCoroutine对象
    //参数block 是从 launch的参数传递进来的,也就是示例代码中 launch后的lambda表达式
    public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) 
        initParentJob()
        //这里的receiver 和this 是同一个对象
        //receiver 作为CoroutineScope 类型
        //this  作为 Continuation 类型
        start(block, receiver, this)
    

5、CoroutineStart # invoke

接着来到CoroutineStart的invoke 函数

代码六
    public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>) =
        when (this) 
            // 本例中执行这句,那我们就进入这个分析
            CoroutineStart.DEFAULT -> block.startCoroutineCancellable(receiver, completion)
            CoroutineStart.ATOMIC -> block.startCoroutine(receiver, completion)
            CoroutineStart.UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
            CoroutineStart.LAZY -> Unit // will start lazily
        

注意,这里的参数block 是示例代码中的一个匿名内部类的对象

6、 startCoroutineCancellable

代码七
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
    runSafely(completion) 
        createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit))
    

下面依次分析 createCoroutineUninterceptedinterceptedresumeCancellableWith这三个函数

7、createCoroutineUnintercepted

代码八
public actual fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(
    receiver: R,
    completion: Continuation<T>
): Continuation<Unit> 
    val probeCompletion = probeCoroutineCreated(completion)
    return if (this is BaseContinuationImpl)
        //
        create(receiver, probeCompletion)
    else 
        createCoroutineFromSuspendFunction(probeCompletion) 
            (this as Function2<R, Continuation<T>, Any?>).invoke(receiver, it)
        
    

最终会调用到SuspendLambda 类中,示例代码中的匿名内部类继承了SuspendLambda

kotlin/coroutines/jvm/internal/ContinuationImpl.kt

代码九
internal abstract class SuspendLambda(
    public override val arity: Int,
    completion: Continuation<Any?>?
    //调用了ContinuationImpl的一参构造函数,
    //completion  是代码一中 创建的StandaloneCoroutine
) : ContinuationImpl(completion), FunctionBase<Any?>, SuspendFunction 
    constructor(arity: Int) : this(arity, null)

    public override fun toString(): String =
        if (completion == null)
            Reflection.renderLambdaToString(this) // this is lambda
        else
            super.toString() // this is continuation

SuspendLambda 继承了ContinuationImpl,下面来看看ContinuationImpl

代码十
internal abstract class ContinuationImpl(
    completion: Continuation<Any?>?,
    private val _context: CoroutineContext?
) : BaseContinuationImpl(completion) 
    //会调用到这个构造函数,
    //completion 是代码一中 创建的StandaloneCoroutine
    //completion?.context 类型是协程上下文,这里是AbstractCoroutine类中的context
    constructor(completion: Continuation<Any?>?) : this(completion, completion?.context)

    public override val context: CoroutineContext
        get() = _context!!

    @Transient
    private var intercepted: Continuation<Any?>? = null
    
    // 代码七中 调用intercepted

以上是关于Kotlin Coroutine 源码解析 —— 协程是如何运行的的主要内容,如果未能解决你的问题,请参考以下文章

Kotlin之协程coroutine lifecycleScope 和 viewModelScope源码

kotlin 协程万字协程 一篇完成kotlin 协程进阶

kotlin - Coroutine 协程

Kotlin 协程单元测试错误:线程“main @coroutine#1 @coroutine#2”中的异常 java.lang.NullPointerException

使用 kotlin-coroutine 时 OkHttp 调度程序崩溃

深入理解 Kotlin Coroutine