Kotlin协程 - launch原理 笔记

Posted 且听真言

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kotlin协程 - launch原理 笔记相关的知识,希望对你有一定的参考价值。

一、协程是如何创建的?

launch、async 可以创建、启动新的协程,那么协程到底是如何创建的?

  runBlocking 
        println(Thread.currentThread().name)
        launch 
            println(Thread.currentThread().name)
            delay(100L)
        
        Thread.sleep(1000L)
    


Log
main @coroutine#1
main @coroutine#2

Process finished with exit code 0

 runBlocking 启动了第一个协程,launch 启动了第二个协程。

1.协程启动的基础 API



public fun <T> (suspend () -> T).createCoroutine(
    completion: Continuation<T>
): Continuation<Unit> =
    SafeContinuation(createCoroutineUnintercepted(completion).intercepted(), COROUTINE_SUSPENDED)

public fun <T> (suspend () -> T).startCoroutine(
    completion: Continuation<T>
) 
    createCoroutineUnintercepted(completion).intercepted().resume(Unit)

createCoroutine、startCoroutine就是 Kotlin 协程当中最基础的两个创建协程的 API。启动协程有三种常见的方式:launch、runBlocking、async。它们其实属于协程中间层提供的 API,而它们的底层都调用了“基础层”的协程 API。

createCoroutine、startCoroutine是扩展函数,其扩展接收者类型是一个函数类型:suspend () -> T,代表了“无参数,返回值为 T 的挂起函数或者 Lambda”。而对于函数本身,它们两个都接收一个 Continuation<T> 类型的参数,其中一个函数,还会返回一个 Continuation<Unit> 类型的返回值。

val block = suspend 
    println("Hello")
    delay(1000L)
    println("World!")
    "Result"


fun testLaunch2() 
    val continuation = object : Continuation<String> 
        override val context: CoroutineContext
            get() = EmptyCoroutineContext

        override fun resumeWith(result: Result<String>) 
            println("Result:" + result.getOrNull())
        
    
    block.startCoroutine(continuation)


fun main() 
    testLaunch2()
    Thread.sleep(2000L)


Log
Hello
World!
Result:Result

Process finished with exit code 0

类型为 suspend () -> T的函数或者 Lambda 表达式可以用 block.startCoroutine() 来启动协程了。

Continuation 有两种用法,一种是在实现挂起函数的时候,用于传递挂起函数的执行结果;另一种是在调用挂起函数的时候,以匿名内部类的方式,用于接收挂起函数的执行结果。

使用 createCoroutine() 这个方法其实上面代码的逻辑:

fun testLaunch3() 
    val continuation = object : Continuation<String> 
        override val context: CoroutineContext
            get() = EmptyCoroutineContext

        override fun resumeWith(result: Result<String>) 
            println("Result:" + result.getOrNull())
        
    
    val coroutinue = block.createCoroutine(continuation)
    coroutinue.resume(Unit)



val block = suspend 
    println("Hello")
    delay(1000L)
    println("World!")
    "Result"


fun main() 
    testLaunch3()
    Thread.sleep(2000L)


Log
Hello
World!
Result:Result

Process finished with exit code 0

 createCoroutine() 创建一个协程,先不启动。调用 resume() 才能启动。 createCoroutine()、startCoroutine() 的源代码差别也并不大,只是前者没有调用 resume(),而后者调用了 resume()。startCoroutine() 之所以可以创建并同时启动协程的原因就在于,它在源码中直接调用了 resume(Unit)。

将 startCoroutine()转换为Java:

package com.example.myapplication.testcoroutinue;

import kotlin.Metadata;
import kotlin.Result;
import kotlin.ResultKt;
import kotlin.Unit;
import kotlin.coroutines.Continuation;
import kotlin.coroutines.ContinuationKt;
import kotlin.coroutines.CoroutineContext;
import kotlin.coroutines.EmptyCoroutineContext;
import kotlin.coroutines.intrinsics.IntrinsicsKt;
import kotlin.jvm.functions.Function1;
import kotlin.jvm.internal.Intrinsics;
import kotlinx.coroutines.DelayKt;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

@Metadata(
   mv = 1, 6, 0,
   k = 2,
   d1 = "\\u0000\\u001e\\n\\u0000\\n\\u0002\\u0018\\u0002\\n\\u0002\\u0018\\u0002\\n\\u0002\\u0010\\u000e\\n\\u0002\\u0010\\u0000\\n\\u0002\\b\\u0004\\n\\u0002\\u0010\\u0002\\n\\u0002\\b\\u0002\\u001a\\u0006\\u0010\\b\\u001a\\u00020\\t\\u001a\\u0006\\u0010\\n\\u001a\\u00020\\t\\",\\u0010\\u0000\\u001a\\u0018\\b\\u0001\\u0012\\n\\u0012\\b\\u0012\\u0004\\u0012\\u00020\\u00030\\u0002\\u0012\\u0006\\u0012\\u0004\\u0018\\u00010\\u00040\\u0001ø\\u0001\\u0000¢\\u0006\\n\\n\\u0002\\u0010\\u0007\\u001a\\u0004\\b\\u0005\\u0010\\u0006\\u0082\\u0002\\u0004\\n\\u0002\\b\\u0019¨\\u0006\\u000b",
   d2 = "block", "Lkotlin/Function1;", "Lkotlin/coroutines/Continuation;", "", "", "getBlock", "()Lkotlin/jvm/functions/Function1;", "Lkotlin/jvm/functions/Function1;", "main", "", "testLaunch2", "My_Application.app.main"
)
public final class TestCoroutinue888Kt 
// Kotlin 为 block 变量生成的静态变量
   @NotNull
   private static final Function1 block;

   public static final void main() 
      testLaunch2();
      Thread.sleep(2000L);
   

   // $FF: synthetic method
   public static void main(String[] var0) 
      main();
   
    
    // Kotlin 为 block 变量生成的静态变量以及方法

   @NotNull
   public static final Function1 getBlock() 
      return block;
   

   public static final void testLaunch2() 
//continuation 变量对应的匿名内部类
      <undefinedtype> continuation = new Continuation() 
         @NotNull
         public CoroutineContext getContext() 
            return (CoroutineContext)EmptyCoroutineContext.INSTANCE;
         

         public void resumeWith(@NotNull Object result) 
            String var2 = "Result:" + (String)(Result.isFailure-impl(result) ? null : result);
            System.out.println(var2);
         
      ;
//block.startCoroutine(continuation) 转换成了ContinuationKt.startCoroutine(block, (Continuation)continuation)
      ContinuationKt.startCoroutine(block, (Continuation)continuation);
   

   static 
    //实现了 Continuation 接口
      Function1 var0 = (Function1)(new Function1((Continuation)null) 
         int label;
//invokeSuspend()为协程状态机逻辑

         @Nullable
         public final Object invokeSuspend(@NotNull Object $result) 
            Object var3 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
            String var2;
            switch(this.label) 
            case 0:
               ResultKt.throwOnFailure($result);
               var2 = "Hello";
               System.out.println(var2);
               this.label = 1;
               if (DelayKt.delay(1000L, this) == var3) 
                  return var3;
               
               break;
            case 1:
               ResultKt.throwOnFailure($result);
               break;
            default:
               throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
            

            var2 = "World!";
            System.out.println(var2);
            return "Result";
         

         @NotNull
         public final Continuation create(@NotNull Continuation completion) 
            Intrinsics.checkNotNullParameter(completion, "completion");
            Function1 var2 = new <anonymous constructor>(completion);
            return var2;
         

         public final Object invoke(Object var1) 
            return ((<undefinedtype>)this.create((Continuation)var1)).invokeSuspend(Unit.INSTANCE);
         
      );
      block = var0;
   

 




public fun <T> (suspend () -> T).startCoroutine(
    completion: Continuation<T>
) 
createCoroutineUnintercepted(completion).intercepted().resume(Unit)

在 startCoroutine() 当中,首先会调用 createCoroutineUnintercepted() 方法。


public expect fun <T> (suspend () -> T).createCoroutineUnintercepted(
    completion: Continuation<T>
): Continuation<Unit>

 代码中的 expect,一种声明,由于 Kotlin 是面向多个平台的,具体的实现,就需要在特定的平台实现。


public actual fun <T> (suspend () -> T).createCoroutineUnintercepted(
    completion: Continuation<T>
): Continuation<Unit> 
    val probeCompletion = probeCoroutineCreated(completion)
    return if (this is BaseContinuationImpl)
        create(probeCompletion)
    else
        createCoroutineFromSuspendFunction(probeCompletion) 
            (this as Function1<Continuation<T>, Any?>).invoke(it)
        

  actual,代表了 createCoroutineUnintercepted() 在 JVM 平台的实现。

createCoroutineUnintercepted() 是一个扩展函数,this代表了 block 变量。(this is BaseContinuationImpl) 条件为ture,就会调用 create(probeCompletion)。




public open fun create(completion: Continuation<*>): Continuation<Unit> 
    throw UnsupportedOperationException("create(Continuation) has not been overridden")

在默认情况下,这个 create() 方法是会抛出异常的。


         @NotNull
         public final Continuation create(@NotNull Continuation completion) 
            Intrinsics.checkNotNullParameter(completion, "completion");
            Function1 var2 = new <anonymous constructor>(completion);
            return var2;
         

返回了Continuation 对象。



public fun <T> (suspend () -> T).startCoroutine(
    completion: Continuation<T>
) 

createCoroutineUnintercepted(completion).intercepted().resume(Unit)

 intercepted() 在 JVM 实现如下:




public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
    (this as? ContinuationImpl)?.intercepted() ?: this

将 Continuation 强转成了 ContinuationImpl,调用了它的 intercepted()。

ContinuationImpl 的源代码:



internal abstract class ContinuationImpl(
    completion: Continuation<Any?>?,
    private val _context: CoroutineContext?
) : BaseContinuationImpl(completion) 

    @Transient
    private var intercepted: Continuation<Any?>? = null

    public fun intercepted(): Continuation<Any?> =
        intercepted
            ?: (context[ContinuationInterceptor]?.interceptContinuation(this) ?: this)
                .also  intercepted = it 

 

通过 ContinuationInterceptor,对 Continuation 进行拦截,从而将程序的执行逻辑派发到特定的线程之上。

 resume(Unit):



public fun <T> (suspend () -> T).startCoroutine(
    completion: Continuation<T>
) 
                                                     
createCoroutineUnintercepted(completion).intercepted().resume(Unit)

  resume(Unit),作用其实就相当于启动了协程。

二、launch 是如何启动协程的?

fun main() 
    testLaunch11()
    Thread.sleep(2000L)


fun testLaunch11() 
    val coroutineScope = CoroutineScope(Job())
    coroutineScope.launch 
        println("Hello")
        delay(1000L)
        println("World!")
    


Log
Hello
World!

Process finished with exit code 0

转Java

package com.example.myapplication.testcoroutinue;

import kotlin.Metadata;
import kotlin.ResultKt;
import kotlin.Unit;
import kotlin.coroutines.Continuation;
import kotlin.coroutines.CoroutineContext;
import kotlin.coroutines.intrinsics.IntrinsicsKt;
import kotlin.jvm.functions.Function2;
import kotlin.jvm.internal.Intrinsics;
import kotlinx.coroutines.BuildersKt;
import kotlinx.coroutines.CoroutineScope;
import kotlinx.coroutines.CoroutineScopeKt;
import kotlinx.coroutines.CoroutineStart;
import kotlinx.coroutines.DelayKt;
import kotlinx.coroutines.Job;
import kotlinx.coroutines.JobKt;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

@Metadata(
   mv = 1, 6, 0,
   k = 2,
   d1 = "\\u0000\\n\\n\\u0000\\n\\u0002\\u0010\\u0002\\n\\u0002\\b\\u0002\\u001a\\u0006\\u0010\\u0000\\u001a\\u00020\\u0001\\u001a\\u0006\\u0010\\u0002\\u001a\\u00020\\u0001¨\\u0006\\u0003",
   d2 = "main", "", "testLaunch11", "My_Application.app.main"
)
public final class TestCoroutinue999Kt 
   public static final void main() 
      testLaunch11();
      Thread.sleep(2000L);
   

   // $FF: synthetic method
   public static void main(String[] var0) 
      main();
   

   public static final void testLaunch11() 
      CoroutineScope coroutineScope = CoroutineScopeKt.CoroutineScope((CoroutineContext)JobKt.Job$default((Job)null, 1, (Object)null));
//对应 launch 当中的 Lambda。
      BuildersKt.launch$default(coroutineScope, (CoroutineContext)null, (CoroutineStart)null, (Function2)(new Function2((Continuation)null) 
         int label;

         @Nullable
         public final Object invokeSuspend(@NotNull Object $result) 
            Object var3 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
            String var2;
            switch(this.label) 
            case 0:
               ResultKt.throwOnFailure($result);
               var2 = "Hello";
               System.out.println(var2);
               this.label = 1;
               if (DelayKt.delay(1000L, this) == var3) 
                  return var3;
               
               break;
            case 1:
               ResultKt.throwOnFailure($result);
               break;
            default:
               throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
            

            var2 = "World!";
            System.out.println(var2);
            return Unit.INSTANCE;
         

         @NotNull
         public final Continuation create(@Nullable Object value, @NotNull Continuation completion) 
            Intrinsics.checkNotNullParameter(completion, "completion");
            Function2 var3 = new <anonymous constructor>(completion);
            return var3;
         

         public final Object invoke(Object var1, Object var2) 
            return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
         
      ), 3, (Object)null);
   

launch源码


public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job 
    //launch 会根据传入的 CoroutineContext 创建出新的 Context。
    val newContext = newCoroutineContext(context)
    //launch 会根据传入的启动模式来创建对应的协程对象。这里有两种,一种是标准的,一种是懒加载的。
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    //启动协程。
    coroutine.start(start, coroutine, block)
    return coroutine

  coroutine.start() :


public abstract class AbstractCoroutine<in T>(
    parentContext: CoroutineContext,
    initParentJob: Boolean,
    active: Boolean
) : JobSupport(active), Job, Continuation<T>, CoroutineScope 

   

    public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) 
        start(block, receiver, this)
    

AbstractCoroutine.kt 对应协程的抽象逻辑。AbstractCoroutine 的 start() 方法,用于启动协程。 


public enum class CoroutineStart 
    public operator fun <T> invoke(block: suspend () -> T, completion: Continuation<T>): Unit =
        when (this) 
            DEFAULT -> block.startCoroutineCancellable(completion)
            ATOMIC -> block.startCoroutine(completion)
            UNDISPATCHED -> block.startCoroutineUndispatched(completion)
            LAZY -> Unit // will start lazily
        

start(block, receiver, this),进入 CoroutineStart.invoke()。

 invoke() 方法当中,根据 launch 传入的启动模式,以不同的方式启动协程。当启动模式是 ATOMIC 的时候,就会调用 block.startCoroutine(completion)。startCoroutineUndispatched(completion) 和 startCoroutineCancellable(completion),只是在 startCoroutine() 的基础上增加了一些额外的功能而已。前者代表启动协程以后就不会被分发,后者代表启动以后可以响应取消。

startCoroutineCancellable(completion)


public fun <T> (suspend () -> T).startCoroutineCancellable(completion: Continuation<T>): Unit = runSafely(completion) 
    
    createCoroutineUnintercepted(completion).intercepted().resumeCancellableWith(Result.success(Unit))


public actual fun <T> (suspend () -> T).createCoroutineUnintercepted(
    completion: Continuation<T>
): Continuation<Unit> 
    val probeCompletion = probeCoroutineCreated(completion)

    return if (this is BaseContinuationImpl)
        
        create(probeCompletion)
    else
        createCoroutineFromSuspendFunction(probeCompletion) 
            (this as Function1<Continuation<T>, Any?>).invoke(it)
        

startCoroutineCancellable() 的源代码,会调用 createCoroutineUnintercepted(),然后调用 create(probeCompletion),然后最终会调用create() 方法。launch 这个 API,只是对协程的基础元素 startCoroutine() 等方法进行了一些封装而已。

以上是关于Kotlin协程 - launch原理 笔记的主要内容,如果未能解决你的问题,请参考以下文章

Kotlin协程笔记:CoroutineScope管理协程

Kotlin Coroutines 协程实现原理全解析

Kotlin协程:Flow基础原理

Kotlin 启动协程launch 与async的区别按照顺序启动协程

Kotlin协程GlobalScope.launch无法引用

Kotlin协程GlobalScope.launch无法引用