一文搞明白协程的挂起和恢复

Posted 丶笑看退场

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了一文搞明白协程的挂起和恢复相关的知识,希望对你有一定的参考价值。

协程是使用非阻塞式挂起的方式来实现线程运行的。那协程又是如何挂起和恢复的,这里面的概念又是什么,带着这些问题就让我们重新探究下协程的挂起和恢复。

我们先创建个协程:

override fun initView() 
        lifecycleScope.launch 
            val num = dealA()
            dealB(num)
        
    

private suspend fun dealA():Int 
        withContext(Dispatchers.IO) 
            delay(3000)
        
        return 1
    

private suspend fun dealB(num:Int) 
        withContext(Dispatchers.IO) 
            delay(1000)
        
    

可以看到写协程的时候要在函数前面加上suspend修饰,这也是常说的挂起函数,那挂起函数又是什么?

挂起函数

了解之前,我们先将上面的挂起函数dealA()反编译成 Java,简单的看看编译后是什么样的?(省略了后面会着重解释的一些代码,主要先看挂起函数的方法)

private final Object dealA(Continuation var1) 
      ......

      Object $result = ((<undefinedtype>)$continuation).result;
      Object var4 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
      switch(((<undefinedtype>)$continuation).label) 
      case 0:
         ResultKt.throwOnFailure($result);
         CoroutineContext var10000 = (CoroutineContext)Dispatchers.getIO();
         Function2 var10001 = (Function2)(new Function2((Continuation)null) 
            int label;

            @Nullable
            public final Object invokeSuspend(@NotNull Object $result) 
               .......

               return Unit.INSTANCE;
            
     		......
         if (BuildersKt.withContext(var10000, var10001, (Continuation)$continuation) == var4) 
            return var4;
         
         break;
      ......
      

      return Boxing.boxInt(1);
   
     

可以看到suspend经过反编译后,会出现Continuation类型的参数传进去,并且返回的是Object对象。

是不是对Continuation是什么很好奇,这也是协程的核心部分`:

public interface Continuation<in T> 
    //对应于这个延续的协程的上下文
    public val context: CoroutineContext

    //继续执行相应的协程,传递一个成功或失败的 [result] 作为最后一个暂停点的返回值。
    public fun resumeWith(result: Result<T>)

从定义上可以看出Continuation其实就是一个带有泛型参数的callback,而resumeWith也就相当于onSuccess的成功回调,来恢复执行后面的代码,除这个之外,还有一个ContineContext,它就是协程的上下文。

回到dealA方法中,当执行到withContext方法的时候,会返回CoroutineSingletons.COROUTINE_SUSPENDED,表示函数被挂起了,到这里你是不是觉得就结束了,其实还没有。

在查看过程中是不是看到有个invokeSuspend的回调方法还没有被调用,这又是在什么时候会被触发的?

那我们就从刚才执行到的withContext那里进一步查看,写过协程的都知道这就是用来切换线程:

public suspend fun <T> withContext(
    context: CoroutineContext,
    block: suspend CoroutineScope.() -> T
): T 
    contract 
        callsInPlace(block, InvocationKind.EXACTLY_ONCE)
    
    return suspendCoroutineUninterceptedOrReturn sc@  uCont ->
        // compute new context
        val oldContext = uCont.context
        val newContext = oldContext + context
        // always check for cancellation of new context
        newContext.ensureActive()
        // FAST PATH #1 -- 新上下文与旧上下文相同
        if (newContext === oldContext) 
            val coroutine = ScopeCoroutine(newContext, uCont)
            return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
        
        // FAST PATH #2 新的调度程序与旧的调度程序相同
        if (newContext[ContinuationInterceptor] == oldContext[ContinuationInterceptor]) 
            val coroutine = UndispatchedCoroutine(newContext, uCont)
            // 上下文有变化,所以这个线程需要更新
            withCoroutineContext(newContext, null) 
                return@sc coroutine.startUndispatchedOrReturn(coroutine, block)
            
        
        // SLOW PATH -- 使用新的调度程序
        val coroutine = DispatchedCoroutine(newContext, uCont)
        block.startCoroutineCancellable(coroutine, coroutine)
        coroutine.getResult()
    

withContext方法中,传入了两个参数,一个是协程的上下文,另一个就是协程里的代码。可以看到不管新的调度和旧的调度一样最后都是会调用startCoroutineCancellable方法:

internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
    receiver: R, completion: Continuation<T>,
    onCancellation: ((cause: Throwable) -> Unit)? = null
) =
    runSafely(completion) 
        createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit), onCancellation)
    

而在startCoroutineCancellable方法中,创建了Coroutination,之后会调用resumeCancelableWith方法:

public fun <T> Continuation<T>.resumeCancellableWith(
    result: Result<T>,
    onCancellation: ((cause: Throwable) -> Unit)? = null
): Unit = when (this) 
    is DispatchedContinuation -> resumeCancellableWith(result, onCancellation)
    else -> resumeWith(result)

在这里是不是看到了我们之前提到过的resumeWith方法,之前也解释了下它就相当于一个回调。然后我们再来看下它的具体实现,是在ContinuationImpl类中:

public final override fun resumeWith(result: Result<Any?>) 
        var current = this
        var param = result
        while (true) 
          
            probeCoroutineResumed(current)
            with(current) 
                val completion = completion!! // fail fast when trying to resume continuation without completion
                val outcome: Result<Any?> =
                    try 
                        val outcome = invokeSuspend(param)
                        if (outcome === COROUTINE_SUSPENDED) return
                        Result.success(outcome)
                     catch (exception: Throwable) 
                        Result.failure(exception)
                    
                .......
            
        
    

resumeWith方法中执行到了我们一直在找的invokeSuspend,通过这个方法将result回调了出去,并判断当前是不是COROUTINE_SUSPENDED(挂起),是挂起直接退出,去执行上面说到的invokeSuspend里面的内容。

在这里我们了解到invokeSuspend是由resumeWith所触发的,那接下来我们看看真正的挂起和恢复如何被执行的。

协程的启动

了解挂起和恢复的过程,要从协程的启动执行开始,我们还是跟刚才一样反编译启动协程的代码:

 BuildersKt.launch$default((CoroutineScope)LifecycleOwnerKt.getLifecycleScope(this), (CoroutineContext)null, (CoroutineStart)null, (Function2)(new Function2((Continuation)null) 
         int label;

         @Nullable
         public final Object invokeSuspend(@NotNull Object $result) 
            ......
         

         @NotNull
         public final Continuation create(@Nullable Object value, @NotNull Continuation completion) 
            Intrinsics.checkNotNullParameter(completion, "completion");
            Function2 var3 = new <anonymous constructor>(completion);
            return var3;
         

         public final Object invoke(Object var1, Object var2) 
            return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
         
      ), 3, (Object)null);

在协程启动的反编译代码我们又看到了ininvokeSuspend方法,这个方法又是在最下面创建了Continuation,之后在invoke中被调用,更多的信息是看不出来了。我们还是回到launch源码内部里面去去寻找答案。

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job 
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine


//coroutine.start
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) 
        start(block, receiver, this)
    

当查看到start这里的时候,你会发现跟进不下去了。那我们就换种方法,还是将这个类反编译下,你会看到变成了这样:

 public final void start(@NotNull CoroutineStart start, Object receiver, @NotNull Function2 block) 
      Intrinsics.checkNotNullParameter(start, "start");
      Intrinsics.checkNotNullParameter(block, "block");
      start.invoke(block, receiver, (Continuation)this);
   

//使用此协程启动策略将带有接收器的相应块作为协程启动
 @InternalCoroutinesApi
    public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>): Unit =
        when (this) 
            DEFAULT -> block.startCoroutineCancellable(receiver, completion)
            ATOMIC -> block.startCoroutine(receiver, completion)
            UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
            LAZY -> Unit // will start lazily
        

在这里又看到了我们熟悉的startCoroutineCancellable,由于默认值为CoroutineStart.DEFAULT,所以该方法会被调用。后面会怎么调用,应该很清楚了,最后会一路调用到invokeSuspend方法,所以这时候就会执行到suspend代码块里面,协程启动!

协程的挂起

调用到invokeinvokeSuspend函数里面的代码的时候,我们单拎出出来看下:

  public final Object invokeSuspend(@NotNull Object $result) 
            Object var3 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
            Object var10000;
            ButtonTextActivity var4;
            switch(this.label) 
            case 0:
               ResultKt.throwOnFailure($result);
               var4 = ButtonTextActivity.this;
               this.label = 1;
               var10000 = var4.dealA(this);
               if (var10000 == var3) 
                  return var3;
               
               break;
            case 1:
               ResultKt.throwOnFailure($result);
               var10000 = $result;
               break;
            case 2:
               ResultKt.throwOnFailure($result);
               return Unit.INSTANCE;
            default:
               throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
            

            int num = ((Number)var10000).intValue();
            var4 = ButtonTextActivity.this;
            this.label = 2;
            if (var4.dealB(num, this) == var3) 
               return var3;
             else 
               return Unit.INSTANCE;
            
         

这里涉及到了label状态机的分析,当label为0时,会调用case为0下面的代码。在里面label被设置为了1,又调用了var4.dealA(this)这个挂起函数,从前面挂起函数的分析知道其会返回COROUTINE_SUSPENDED标志,所以var10000也就会得到COROUTINE_SUSPENDED标志,此时会被判断相等,协程会被挂起。

协程的恢复

挂起后就要恢复了。在前面执行到的dealA方法中,在withContext的时候会触发dealA中的invokeSuspend方法。此时label被设置为1,所以会被调用到case为1的代码:

//dealA 
public final Object invokeSuspend(@NotNull Object $result) 
               Object var2 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
               switch(this.label) 
               case 0:
                  ResultKt.throwOnFailure($result);
                  this.label = 1;
                  if (DelayKt.delay(3000L, this) == var2) 
                     return var2;
                  
                  break;
               case 1:
                  ResultKt.throwOnFailure($result);
                  break;
               default:
                  throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
               

               return Unit.INSTANCE;
            

 return Boxing.boxInt(1);

执行了ResultKt.throwOnFailure($result),最后返回int的值。同时launch中的invokeSuspend也被执行,上面已经将label设置为1,这里就会执行到case 1下的代码:

//launch  invokeSuspend
switch(this.label) 
  ......
    case 1:
          ResultKt.throwOnFailure($result);
          var10000 = $result;
          break;


int num = ((Number)var10000).intValue();
            var4 = ButtonTextActivity.this;
            this.label = 2;
            if (var4.dealB(num, this) == var3) 
               return var3;
             else 
               return Unit.INSTANCE;
            

对结果进行了失败处理,此时var10000也就是刚刚得到的int值,接着执行suspend剩余的代码,在下面将lable设置为了2,开始执行dealB的方法。

dealB方法中,跟之前分析的步骤一样,也会回到invokeSuspend中:

private final Object dealB(int num, Continuation $completion) 
      Object var10000 = BuildersKt.withContext((CoroutineContext)Dispatchers.getIO(), (Function2)(new Function2((Continuation)null) 
         int label;

         @Nullable
         public final Object invokeSuspend(@NotNull Object $result) 
           ......

            return Unit.INSTANCE;
         

         ......
      return var10000 == IntrinsicsKt.getCOROUTINE_SUSPENDED() ? var10000 : Unit.INSTANCE;
   

最后当没有挂起函数的时候,会返回Unit.INSTANCE,结束协程执行。

小结

协程通过suspend来标识挂起点,但真正的挂起点还需要通过是否返回COROUTINE_SUSPENDED来判断,而代码体现是通过状态机来处理协程的挂起与恢复。

在挂起和恢复的过程中,当判断挂起函数到返回值是COROUTINE_SUSPENDED标志时,会挂起,在需要挂起的时候,状态机会把之前的结果以成员变量的方式保存在 continuation 中。在挂起函数恢复的时候,会调用Continuation的resumeWith方法,继而触发invokeSuspend。根据保存在Continuation中的label,进入不同的 分支恢复之前保存的状态,进入下一个状态。

在挂起的时候并不会阻塞当前的线程,是因为挂起是在invokeSuspend方法中return出去的,而invokeSuspend之外的函数当然还是会继续执行。

参考

Kotlin协程实现原理:挂起与恢复

Kotlin:深度理解协程挂起恢复实现原理。

Kotlin Jetpack 实战 | 09. 图解协程原理

「Kotlin篇」原来,协程是这么挂起的!

协程到底是怎么切换线程的?

以上是关于一文搞明白协程的挂起和恢复的主要内容,如果未能解决你的问题,请参考以下文章

一文搞明白协程的挂起和恢复

Kotlin 协程协程的挂起和恢复 ① ( 协程的挂起和恢复概念 | 协程的 suspend 挂起函数 )

Kotlin 协程协程的挂起和恢复 ① ( 协程的挂起和恢复概念 | 协程的 suspend 挂起函数 )

Kotlin 协程协程的挂起和恢复 ② ( 协程挂起 和 线程阻塞 对比 )

Kotlin 协程协程的挂起和恢复 ② ( 协程挂起 和 线程阻塞 对比 )

Kotlin 协程Flow 异步流 ① ( 以异步返回返回多个返回值 | 同步调用返回多个值的弊端 | 尝试在 sequence 中调用挂起函数返回多个返回值 | 协程中调用挂起函数返回集合 )