Kotlin 协程源码解析

Posted jackingzheng

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kotlin 协程源码解析相关的知识,希望对你有一定的参考价值。

本文适合有协程使用基础并想了解内部原理的开发者

项目地址:kotlinx.coroutines, 分析的版本: v1.3
本文结构

1、什么协程(Coroutines)
2、编译器实现协程的异步操作的同步调用
3、协程上下文切换
4、协程执行效率要高于线程/线程池
5、结构化并发原理
6、总结

一、什么协程(Coroutines)

看下Android文档对协程的定义:协程是一种并发设计模式,您可以在 android 平台上使用它来简化异步执行的代码。协程是在版本 1.3 中添加到 Kotlin 的,它基于来自其他语言的既定概念。

本文主要分析协程以下功能的内部实现

  1. 编译器是如何实现协程的异步操作的同步调用?(如何帮我们解决异步操作回调地狱呢?)
  2. coroutine是如何通过CoroutineContext来切换上下文的?
  3. coroutine挂起和恢复是如何实现的。
  4. 为什么协程效率要高于线程/线程池?
  5. 协程的结构化并发是如何实现的?

二、编译器实现协程的异步操作的同步调用。

2.1 CPS(Continuation Passing Style)

我们先看一个经典的网络请求并更新UI的例子

private val uiHandler = Handler(Looper.getMainLooper())

fun fetchDocs() {
    get("https://developer.android.com") { result ->
        show(result)
    }
}

fun get(url: String, onSuccess: (String) -> Unit) {
    Thread {
        print("进行网络请求 get = $url")
        onSuccess("Docs Content")
    }.start()
}

fun show(content: String) {
    uiHandler.post { print("显示dialog $content") }
}

从上面例子可以看到我们需要创建子线程进行网络请求,然后通过回调调用show(),show()里面uiHandler切换到UI线程更新UI。能看出我们通过回调来返回异步操作结果。

我们看看经过协程改造后的例子

fun fetchDocs() {                             
    CoroutineScope(Dispatchers.Main).launch {  // Dispatchers.Main
        val result = get("https://developer.android.com") // Dispatchers.IO for `get`
        show(result)                                      // Dispatchers.Main
    }
}

suspend fun get(url: String) = withContext(Dispatchers.IO) {
    print("进行网络请求 get = $url")
    return@withContext "success"
}

fun show(content: String) {
    print("显示dialog $content")
}

我们看到get()的回调不见了,使用withContext(Dispatchers.IO)指定在IO线程执行,并同步方式的返回请求结果,show()将结果更新到UI上。

协程使用了什么魔法把异步的调用变成同步调用呢?

我们把上面代码反编译成Java代码,看看他施了什么魔法,代码如下

public final class CoroutineTest2Kt {
   public static final void fetchDocs() {
      BuildersKt.launch$default(CoroutineScopeKt.CoroutineScope(Dispatchers.getMain()), null, null, new Function2((Continuation)null) {
         private CoroutineScope p$;
         Object L$0;
         // 初始状态 label = 0
         int label;

         // 该调用在Dispatchers.Main
         @Nullable
         public final Object invokeSuspend(@NotNull Object $result) {
            Object var4 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
            Object var10000;
            CoroutineScope $this$launch;
            switch(this.label) {
            case 0:
               ResultKt.throwOnFailure($result);
               $this$launch = this.p$;
               this.L$0 = $this$launch;
               // 将label切换为1
               this.label = 1;
               // 把本continuation传给get(),并挂起自身continuation
               var10000 = CoroutineTest2Kt.get("https://developer.android.com", this);
               if (var10000 == var4) {
                  return var4;
               }
               break;
            case 1:
               $this$launch = (CoroutineScope)this.L$0;
               ResultKt.throwOnFailure($result);
               var10000 = $result;
               break;
            default:
               throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
            }
						
            // resumeWhite()重新开始,继续执行,在UI显示
            String result = (String)var10000;
            CoroutineTest2Kt.show(result);
            return Unit.INSTANCE;
         }

         @NotNull
         public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
            Intrinsics.checkParameterIsNotNull(completion, "completion");
            Function2 var3 = new <anonymous constructor>(completion);
            var3.p$ = (CoroutineScope)value;
            return var3;
         }

         public final Object invoke(Object var1, Continuation var2) {
            return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
         }
      }), 3, (Object)null);
   }

   @Nullable
   public static final Object get(@NotNull final String url, @NotNull Continuation $completion) {
      return BuildersKt.withContext((CoroutineContext)Dispatchers.getIO(), (Function2)(new Function2((Continuation)null) {
         private CoroutineScope p$;
         int label;
         
         // 该调用在Dispatchers.IO
         @Nullable
         public final Object invokeSuspend(@NotNull Object $result) {
            Object var5 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
            switch(this.label) {
            case 0:
               // 在IO中处理数据
               ResultKt.throwOnFailure($result);
               CoroutineScope $this$withContext = this.p$;
               String var3 = "进行网络请求 get = " + url;
               boolean var4 = false;
               System.out.print(var3);
               // 返回处理成功
               return "success";
            default:
               throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
            }
         }

         @NotNull
         public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
            Intrinsics.checkParameterIsNotNull(completion, "completion");
            Function2 var3 = new <anonymous constructor>(completion);
            var3.p$ = (CoroutineScope)value;
            return var3;
         }

         public final Object invoke(Object var1, Object var2) {
            return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
         }
      }), $completion);
   }

   public static final void show(@NotNull String content) {
      Intrinsics.checkParameterIsNotNull(content, "content");
      String var1 = "显示dialog " + content;
      boolean var2 = false;
      System.out.print(var1);
   }
}

答案就是:CPS(Continuation Passing Style)技术,通过编译的手段,对suspend函数添加continuation参数,Continuation内部是状态机,通过label变量来实现Continuation内部的状态的切换,所以我们虽然写协程代码是同步方式,但编译器会把suspend代码转化成异步操作。这里我们只要知道编译器帮我们生成了Continuation,至于协程内部如何切换协程上下文的,如何实现协程的挂起和恢复的,我们继续往下看。

三、协程上下文切换

3.1 launch的源码分析

我们通过分析launch()方法来回答以下问题。

  1. Coroutine是如何切换上下文CoroutineContext?
  2. Coroutine的resumeWith和suspendInvoke如何实现挂起和恢复?
public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    // 创建新协程上下文
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        // 创建标准协程
        StandaloneCoroutine(newContext, active = true)
    // 启动协程
    coroutine.start(start, coroutine, block)
    // 返回启动的协程
    return coroutine
}


start(block, receiver, this) 传参中receiver对应launch中的StandaloneCoroutine,this对应StandaloneCoroutine

public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
    initParentJob()
    start(block, receiver, this)
}
public enum class CoroutineStart {
		DEFAULT,
		LAZY,
		ATOMIC,

    public operator fun <T> invoke(block: suspend () -> T, completion: Continuation<T>): Unit =
        when (this) {
          	// 启动可取消的协程
            DEFAULT -> block.startCoroutineCancellable(completion)
            ATOMIC -> block.startCoroutine(completion)
            UNDISPATCHED -> block.startCoroutineUndispatched(completion)
            LAZY -> Unit // will start lazily
        }
}
// 使用此函数以可取消的方式启动协程,以便在等待调度时可以取消协程。
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
    runSafely(completion) {
   // 创建CoroutineUnintercepted
  createCoroutineUnintercepted(receiver,completion).intercepted().resumeCancellableWith(Result.success(Unit))
    }

createCoroutineUnintercepted()的实现

JVM createCoroutineUnintercepted实现

public actual fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(
    receiver: R,
    completion: Continuation<T>
): Continuation<Unit> {
    val probeCompletion = probeCoroutineCreated(completion)
    return if (this is BaseContinuationImpl)
        create(receiver, probeCompletion)
    else {
      	// 进入这个流程 -》创建协程suspend函数
        createCoroutineFromSuspendFunction(probeCompletion) {
            (this as Function2<R, Continuation<T>, Any?>).invoke(receiver, it)
        }
    }
}
private inline fun <T> createCoroutineFromSuspendFunction(
    completion: Continuation<T>,
    crossinline block: (Continuation<T>) -> Any?
): Continuation<Unit> {
    val context = completion.context
    // label == 0 when coroutine is not started yet (initially) or label == 1 when it was
    return if (context === EmptyCoroutineContext)
        // 省略
    else 
  			// 进入这个流程:创建一个ContinuationImpl单例
        object : ContinuationImpl(completion as Continuation<Any?>, context) {
            private var label = 0

            override fun invokeSuspend(result: Result<Any?>): Any? =
                when (label) {
                    0 -> {
                        label = 1
                        result.getOrThrow() // Rethrow exception if trying to start with exception (will be caught by BaseContinuationImpl.resumeWith
                        block(this) // run the block, may return or suspend
                    }
                    1 -> {
                        label = 2
                        result.getOrThrow() // this is the result if the block had suspended
                    }
                    else -> error("This coroutine had already completed")
                }
        }
}

intercepted()的实现

public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
		// 调用ContinuationImpl的intercepted()
    (this as? ContinuationImpl)?.intercepted() ?: this

ContinuationImpl.intercepted()的实现

调用interceptContinuation(this)获取该continuation包装所提供的continuation,从而截获所有恢复。

internal abstract class ContinuationImpl(
    completion: Continuation<Any?>?,
    private val _context: CoroutineContext?
) : BaseContinuationImpl(completion) {
  	// ContinuationImpl的intercepted()实现如下,主要是
		public fun intercepted(): Continuation<Any?> = intercepted ?: 
  // 从context找到ContinuationInterceptor,进行拦截
  (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this).also { intercepted = it }
}

我们看下interceptContinuation

public abstract class CoroutineDispatcher : AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
   // continuation 对应 ContinuationImpl,CoroutineDispatcher对应MainCoroutineDispatcher
    public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
        DispatchedContinuation(this, continuation)
}

interceptContinuation()会创建一个DispatchedContinuation

接着resumeCancellableWith()

public fun <T> Continuation<T>.resumeCancellableWith(result: Result<T>): Unit = when (this) {
  	// 进入DispatchedContinuation这个分支
    is DispatchedContinuation -> resumeCancellableWith(result)
    else -> resumeWith(result)
}

我们看下resumeCancellableWith()的实现

inline fun resumeCancellableWith(result: Result<T>) {
        val state = result.toState()
  			// 先isDispatchNeeded()判断是否进行分发
        if (dispatcher.isDispatchNeeded(context)) {
            _state = state
            resumeMode = MODE_CANCELLABLE
          	// 传人runnable给dispatch进行处理 
            dispatcher.dispatch(context, this)
        } else {
            executeUnconfined(state, MODE_CANCELLABLE) {
                if (!resumeCancelled()) {
                    resumeUndispatchedWith(result)
                }
            }
        }
    }

dispatcher执行的Runable的内容是

// DispatchedTask
public final override fun run() {
        val taskContext = this.taskContext
        var fatalException: Throwable? = null
        try {
            val delegate = delegate as DispatchedContinuation<T>
            val continuation = delegate.continuation
            val context = continuation.context
          	// 获取当前状态
            val state = takeState() 
            withCoroutineContext(context, delegate.countOrElement) {
              	// 测查状态是否Exception
                val exception = getExceptionalResult(state)
              	// 获取context的job
                val job = if (resumeMode.isCancellableMode) context[Job] else null
                
                if (exception == null && job != null && !job.isActive) {
                  	// 进行处理job
                    val cause = job.getCancellationException()
                  	// 如果是可取消job,则进行取消
                    cancelResult(state, cause)
                  	// continuation会调用resumeWith()
                    continuation.resumeWithStackTrace(cause)
                } else {
                    if (exception != null) continuation.以上是关于Kotlin 协程源码解析的主要内容,如果未能解决你的问题,请参考以下文章

Kotlin 协程源码解析

Kotlin 协程源码解析

Kotlin Coroutine 源码解析 —— 协程是如何运行的

Kotlin Coroutine 源码解析 —— 协程是如何运行的

kotlin 协程万字协程 一篇完成kotlin 协程进阶

Kotlin协程源码分析-协程的启动