浅析kotlin协程launch delay调用

Posted 涂程

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了浅析kotlin协程launch delay调用相关的知识,希望对你有一定的参考价值。

作者:信仰年轻

一、简单的协程例子

这个main方法直接看成程序入口方法,此方法不带suspend修饰。

二、runBlocking 启动一个协程

runBlocking一般不在开发中使用,在测试框架中用到,连接非协程世界到协程世界,runBlocking的代码块都在coroutineScope的作用域内。
上面的main方法开启了两个协程,父协程是runBlocking启动的,子协程是里面launch启动的。父协程要等所有的子协程完成才能结束。
先看下runBlocking的源码:

public fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T 
    .....
    val currentThread = Thread.currentThread()   //肯定是主线程
    val contextInterceptor = context[ContinuationInterceptor]  //拦截器,为null
    val eventLoop: EventLoop?        //事件轮询 handler那一套???
    val newContext: CoroutineContext
    if (contextInterceptor == null) 
        eventLoop = ThreadLocalEventLoop.eventLoop  
        newContext = GlobalScope.newCoroutineContext(context + eventLoop)
     else 
        eventLoop = (contextInterceptor as? EventLoop)?.takeIf  it.shouldBeProcessedFromContext() 
            ?: ThreadLocalEventLoop.currentOrNull()
        newContext = GlobalScope.newCoroutineContext(context)
    
    val coroutine = BlockingCoroutine<T>(newContext, currentThread, eventLoop) 
    coroutine.start(CoroutineStart.DEFAULT, coroutine, block)  //start=create+ resumeWith(Unit)
    return coroutine.joinBlocking()

启动协程时runBlocking在这里阻塞了主线程。coroutine的启动使用start,其实start= create + resumeWith(Unit).,EventLoop参考熟悉的handler的流程,肯定存在事件队列,插入事件,取事件的操作,如事件未达处理时间就"阻塞"。

三、launch 启动一个子协程

能够启动协程的方式除了launch,还有async+await,其中async返回的是一个Deferred。

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,start: CoroutineStart = CoroutineStart.DEFAULT,block: suspend CoroutineScope.() -> Unit): Job 
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    coroutine.start(start, coroutine, block)  //启动协程了
    return coroutine

public interface Deferred<out T> : Job   //Deferred也是个Job

luanch的返回值是个Job,通过(coroutine.start)启动协程,在这里就作为子协程,都说子协程会继承的父协程的上下文,这是真的吗?真的。

@ExperimentalCoroutinesApi
public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext 
    val combined = coroutineContext + context     #1
    val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined
    return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)
        debug + Dispatchers.Default else debug

newCoroutineContext方法的作用域也是最外层coroutineScope,然后它的上下文是#1中的coroutineContext,很明显,子协程它可以有自己的上下文,两上下文可以进行combined,这里没有的添加调度器Dispatchers.Default,所以线程也还是主线程。coroutineContext + context 上下文能相加,用的就是重载plus操作符

//from  CoroutineContext 
public operator fun plus(context: CoroutineContext): CoroutineContext =
   ......
       

CoroutineContext还支持coroutineContext[Job],就用到了get约定,重载get运算符。

//from  CoroutineContext
public operator fun <E : Element> get(key: Key<E>): E?

coroutineContext[Job]中的key是伴生对象,既然是伴生对象,那就是个final修饰的静态对象,协程上下文类的多个对象可以共享这个key,值会不会覆盖呢?

public interface ContinuationInterceptor : CoroutineContext.Element 
    companion object Key : CoroutineContext.Key<ContinuationInterceptor>

协程调度器、协程拦截器、协程拦截器都是协程上下文,都实现了CoroutineContext.Element。

3.1、从协程编译后的产物发现它的本体

在runBlocking中启动两个子协程,子协程共用一个线程。👇

使用show kotlin bytecode转换后的代码👇

public final class TestKt 
    public static final void main() 
        BuildersKt.runBlocking((CoroutineContext)(new CoroutineName("coroutine#1")), (Function2)(new Function2((Continuation)null) 
            int label;
            @Nullable
            public final Object invokeSuspend(@NotNull Object var1) 
                Object var6 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
                switch(this.label) 
                    case 0:
                        ResultKt.throwOnFailure(var1);
                        System.out.println("1  " + Thread.currentThread().name + "---" + coroutineContext[CoroutineName])
                        ExecutorCoroutineDispatcher dispatcher = ThreadPoolDispatcherKt.newSingleThreadContext("my owner Thread ");
                        BuildersKt.launch$default($this$runBlocking, (new CoroutineName("coroutine#2")).plus((CoroutineContext)dispatcher), (CoroutineStart)null, (Function2)(new Function2((Continuation)null) 
                            int label;
                            @Nullable
                            public final Object invokeSuspend(@NotNull Object $result) 
                                Object var5 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
                                switch(this.label) 
                                    case 0:
                                        ResultKt.throwOnFailure($result);
                                        System.out.println("2  " + Thread.currentThread().name + "---" + coroutineContext[CoroutineName])
                                        this.label = 1;
                                        if (DelayKt.delay(1000L, this) == var5) 
                                            return var5;
                                        
                                        break;
                                    case 1:
                                        ResultKt.throwOnFailure($result);
                                        break;
                                    default:
                                        throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
                                
                                System.out.println("3  " + Thread.currentThread().name + "---" + coroutineContext[CoroutineName])
                                return Unit.INSTANCE;
                            

                            @NotNull
                            public final Continuation create(@Nullable Object value, @NotNull Continuation completion) 
                                Intrinsics.checkNotNullParameter(completion, "completion");
                                Function2 var3 = new <anonymous constructor>(completion);
                                var3.L$0 = value;
                                return var3;
                            

                            public final Object invoke(Object var1, Object var2) 
                                return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
                            
                        ), 2, (Object)null);
                        BuildersKt.launch$default($this$runBlocking, (new CoroutineName("coroutine#2")).plus((CoroutineContext)dispatcher), (CoroutineStart)null, (Function2)(new Function2((Continuation)null) 
                            int label;
                            @Nullable
                            public final Object invokeSuspend(@NotNull Object $result) 
                                Object var5 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
                                switch(this.label) 
                                    case 0:
                                        ResultKt.throwOnFailure($result);
                                        System.out.println("21  " + Thread.currentThread().name + "---" + coroutineContext[CoroutineName])
                                        this.label = 1;
                                        if (DelayKt.delay(500L, this) == var5) 
                                            return var5;
                                        
                                        break;
                                    case 1:
                                        ResultKt.throwOnFailure($result);
                                        break;
                                    default:
                                        throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
                                
                                System.out.println("31  " + Thread.currentThread().name + "---" + coroutineContext[CoroutineName])
                                return Unit.INSTANCE;
                            

                            @NotNull
                            public final Continuation create(@Nullable Object value, @NotNull Continuation completion) 
                                Intrinsics.checkNotNullParameter(completion, "completion");
                                Function2 var3 = new <anonymous constructor>(completion);
                                var3.L$0 = value;
                                return var3;
                            

                            public final Object invoke(Object var1, Object var2) 
                                return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
                            
                        ), 2, (Object)null);
                        System.out.println("4  " + Thread.currentThread().name + "---" + coroutineContext[CoroutineName])
                        return Unit.INSTANCE;
                    default:
                        throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
                
            

            @NotNull
            public final Continuation create(@Nullable Object value, @NotNull Continuation completion) 
                Intrinsics.checkNotNullParameter(completion, "completion");
                Function2 var3 = new <anonymous constructor>(completion);
                var3.L$0 = value;
                return var3;
            

            public final Object invoke(Object var1, Object var2) 
                return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
            
        ));
    
    public static void main(String[] var0) 
        main();
    

想不到协程的bytecode也会存在回调嵌套,这里还只是父协程嵌套两个子协程,如果存在多个协程嵌套,层级多的话,那将也存在回调地狱。
协程能够挂起的本质,其实就是协程体的代码被挂起点一分为二,这里delay是挂起函数,那代码就分为协程体开始到挂起点delay,delay后面到结束,前面的代码走switch的case 0的逻辑,后面的代码就是挂起点恢复之后,再次走下case 1以及后面的代码。

注意调用挂起函数,不一定会真正挂起,具体看返回值是不是IntrinsicsKt.getCOROUTINE_SUSPENDED();,是的就直接return了,下次需再次调用invokeSuspend,就会走label==1了,执行后面的逻辑。
单拎出一个子协程看看:

 BuildersKt.launch$default($this$runBlocking, (new CoroutineName("coroutine#2")).plus((CoroutineContext)dispatcher), (CoroutineStart)null, (Function2)(new Function2((Continuation)null) 
    int label;
    @Nullable
    public final Object invokeSuspend(@NotNull Object $result)    #2 这个是能找到的
        Object var5 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
        switch(this.label) 
            case 0:
                ResultKt.throwOnFailure($result);
                System.out.println("2  " + Thread.currentThread().name + "---" + coroutineContext[CoroutineName])
                this.label = 1;
                if (DelayKt.delay(1000L, this) == var5) 
                    return var5;
                
                break;
            case 1:
                ResultKt.throwOnFailure($result);
                break;
            default:
                throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
        
        System.out.println("3  " + Thread.currentThread().name + "---" + coroutineContext[CoroutineName])
        return Unit.INSTANCE;
    

    @NotNull
    public final Continuation create(@Nullable Object value, @NotNull Continuation completion) 
        Intrinsics.checkNotNullParameter(completion, "completion");
        Function2 var3 = new <anonymous constructor>(completion);
        var3.L$0 = value;
        return var3;
    

    public final Object invoke(Object var1, Object var2)   #1 这里的调用没有找到
        return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
    
), 2, (Object)null);

首先invoke() #1是怎么触发的?没有找到对应的调用地方,有小伙伴找到的话,麻烦告知下。
再次调用invokeSuspend #2是怎么触发的? 是在continuation.resumeWith(xxx)时触发。

众所周知,kotlin的suspend方法编译之后会在最后一个参数的位置放上Continuation<T>

public fun CoroutineScope.launch(context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit): Job 
BuildersKt.launch$default($this$runBlocking, 
(new CoroutineName("coroutine#2")).plus((CoroutineContext)dispatcher), 
(CoroutineStart)null, (Function2)(new Function2((Continuation)null) ),2, (Object)null);

对应看下,CoroutineScope的launch拓展函数,在java调用时就是作为第一个参数传入,第二个参数就是协程上下文,第三个参数是协程启动模式,接下来为一个就是Function2的匿名类对象,接收参数是Continuation类型,最后多出的两个参数是2和(Object)null,这两个参数是啥,暂不清楚。

当查看原始字节码时,可以看到以下声明:

final class com/docwei/kotlindemo/TestKt$main$1$1
extends kotlin/coroutines/jvm/internal/SuspendLambda    //协程体编译成了SuspendLambda类!!!!
implements kotlin/jvm/functions/Function2              //同时也是Fucntion2!!!

INVOKESTATIC kotlinx/coroutines/BuildersKt.launch$default 
(Lkotlinx/coroutines/CoroutineScope;
Lkotlin/coroutines/CoroutineContext;
Lkotlinx/coroutines/CoroutineStart;
Lkotlin/jvm/functions/Function2;
ILjava/lang/Object;)Lkotlinx/coroutines/Job;

SuspendLambda类 可以看下类继承关系

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job 
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    coroutine.start(start, coroutine, block) //#1
    return coroutine

1看起来是我们创建了一个新协程,然后传入block就是我们的协程体。

//from AbstractCoroutine
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.以上是关于浅析kotlin协程launch delay调用的主要内容,如果未能解决你的问题,请参考以下文章

Kotlin协程 - launch原理 笔记

深入kotlin - 创建协程

深入kotlin - 创建协程

kotlin协程硬核解读(2. 协程基础使用&源码浅析)

深入kotlin - 协程

kotlin协程硬核解读(2. 协程基础使用&源码浅析)