Kotlin Coroutine 源码解析 —— 协程是如何运行的
Posted 薛瑄
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kotlin Coroutine 源码解析 —— 协程是如何运行的相关的知识,希望对你有一定的参考价值。
前言
在刚接触协程的时候,就产生了一个疑问,协程到底有没有创建线程,如果有,那它和线程池有什么区别?如果没有,为什么在android的主线程中不会 阻塞,了解Android源码的 应该猜到了,肯定是创建了子线程,不然 就会报异常了。那协程又是如何保证任务能切换到主线程呢? 下面就带着这几个问题,来分析协程的源码
示例
用下面的示例 模拟获取token,请求网络的过程。顺序执行,不会阻塞当前线程,
fun main()
GlobalScope.launch
println("开始 ")
val token = requestToken()
val post = createPost(token, "item")
val postResult = processPost(post)
println("结束 ")
Thread.sleep(60*60*1000L) // 阻塞主线程来保证 JVM 存活
suspend fun requestToken():String
println("获取token")
return "token"
suspend fun createPost(token:String,item:String):String
println("创建postItem")
return "$token + $item"
suspend fun processPost(post:String):String
println("执行post请求")
return "$post + result"
来看一下,kotlin转换成java的代码
public final class TestKt
public static final void main()
BuildersKt.launch$default((CoroutineScope)GlobalScope.INSTANCE, (CoroutineContext)null, (CoroutineStart)null, (Function2)(new Function2((Continuation)null)
private CoroutineScope p$;
Object L$0;
Object L$1;
Object L$2;
int label;
@Nullable
public final Object invokeSuspend(@NotNull Object $result)
Object var10000;
label26:
CoroutineScope $this$launch;
String token;
String post;
Object var8;
label25:
var8 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch(this.label)
case 0:
ResultKt.throwOnFailure($result);
$this$launch = this.p$;
token = "开始 ";
boolean var9 = false;
System.out.println(token);
this.L$0 = $this$launch;
this.label = 1;
var10000 = TestKt.requestToken(this);
if (var10000 == var8)
return var8;
break;
case 1:
$this$launch = (CoroutineScope)this.L$0;
ResultKt.throwOnFailure($result);
var10000 = $result;
break;
case 2:
token = (String)this.L$1;
$this$launch = (CoroutineScope)this.L$0;
ResultKt.throwOnFailure($result);
var10000 = $result;
break label25;
case 3:
post = (String)this.L$2;
token = (String)this.L$1;
$this$launch = (CoroutineScope)this.L$0;
ResultKt.throwOnFailure($result);
var10000 = $result;
break label26;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
token = (String)var10000;
this.L$0 = $this$launch;
this.L$1 = token;
this.label = 2;
var10000 = TestKt.createPost(token, "item", this);
if (var10000 == var8)
return var8;
post = (String)var10000;
this.L$0 = $this$launch;
this.L$1 = token;
this.L$2 = post;
this.label = 3;
var10000 = TestKt.processPost(post, this);
if (var10000 == var8)
return var8;
String postResult = (String)var10000;
String var6 = "结束 ";
boolean var7 = false;
System.out.println(var6);
return Unit.INSTANCE;
@NotNull
public final Continuation create(@Nullable Object value, @NotNull Continuation completion)
Intrinsics.checkParameterIsNotNull(completion, "completion");
Function2 var3 = new <anonymous constructor>(completion);
var3.p$ = (CoroutineScope)value;
return var3;
public final Object invoke(Object var1, Object var2)
return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
), 3, (Object)null);
Thread.sleep(3600000L);
// $FF: synthetic method
public static void main(String[] var0)
main();
@Nullable
public static final Object requestToken(@NotNull Continuation $completion)
String var1 = "获取token";
boolean var2 = false;
System.out.println(var1);
return "token";
@Nullable
public static final Object createPost(@NotNull String token, @NotNull String item, @NotNull Continuation $completion)
String var3 = "创建postItem";
boolean var4 = false;
System.out.println(var3);
return token + " + " + item;
@Nullable
public static final Object processPost(@NotNull String post, @NotNull Continuation $completion)
String var2 = "执行post请求";
boolean var3 = false;
System.out.println(var2);
return post + " + result";
从java代码,你应该已经看出一些端倪了,最终肯定是调用了invokeSuspend
,来顺序执行了业务中的三个函数。没错,需要等待前一个函数执行完,才能执行后一个函数。以一种同步的方式,实现了异步编程。那么接下来的重点就是分析,最终如何调用到这个invokeSuspend
函数。
从Kotlin 生成的字节码,可以看出,就是launch中的lambda 参数,最终会转换为 BuildersKt.launch$default 的第四个参数,它的类型是一个匿名内部类
final class com/icisoo/xw/TestKt$main$1 extends kotlin/coroutines/jvm/internal/SuspendLambda implements kotlin/jvm/functions/Function2
可以看到它继承了SuspendLambda ->继承 ContinuationImpl ->继承 BaseContinuationImpl,最终在 resumeWith()
中会调用invokeSuspend()
源码分析
1、launch
从示例的kotlin 代码 开始分析,首先是launch 函数
代码一:
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job
//把当前context 加入到已有的CoroutineContext,并返回
val newContext = newCoroutineContext(context)
//创建一个协程,它们是继承了AbstractCoroutine
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
StandaloneCoroutine(newContext, active = true)
//执行协程
coroutine.start(start, coroutine, block)
return coroutine
- CoroutineContext 协程上下文
- 包含当前协程scope的信息, 例如Job, ContinuationInterceptor, CoroutineName 和CoroutineId。
- 如果是嵌套的launch,那么里层的launch会使用外层launch的CoroutineContext
- CoroutineContext中,是用map来存这些信息的, map的键是这些类的伴生对象,值是这些类的一个实例。 你可以看到这样的代码,来获取context的信息:
val job = context[Job]
val continuationInterceptor = context[ContinuationInterceptor]
- CoroutineScope 协程的作用域,调用
myScope.cancel()
可以停止该作用域里的所有任务(Activity界面destroy(),子任务没必要继续执行了),下面介绍几种创建作用域的方式:
- 使用lauch, async 等builder创建一个新的子协程。协程(AbstractCoroutine)继承了 CoroutineScope,从父scope中继承了协程上下文 以及Job
- 使用coroutineScope Api创建新scope:
public suspend fun <R> coroutineScope(block: suspend CoroutineScope.() -> R): R
这个api主要用于方便地创建一个子域,并且管理域中的所有子协程。注意这个方法只有在所有 block中创建的子协程全部执行完毕后,才会退出。如下示例:
// print输出的结果顺序将会是 1, 2, 3, 4
coroutineScope
delay(1000)
println("1")
launch
delay(6000)
println("3")
println("2")
return@coroutineScope
println("4")
- 继承CoroutineScope. 或直接使用CoroutineScope创建对象,用于处理具有生命周期的对象。
下面看一下Android中Viewmodel的Scope 源码
val ViewModel.viewModelScope: CoroutineScope
get()
val scope: CoroutineScope? = this.getTag(JOB_KEY)
if (scope != null)
return scope
return setTagIfAbsent(JOB_KEY,
CloseableCoroutineScope(SupervisorJob() + Dispatchers.Main.immediate))
internal class CloseableCoroutineScope(context: CoroutineContext) : Closeable, CoroutineScope
override val coroutineContext: CoroutineContext = context
override fun close()
coroutineContext.cancel()
或者使用CoroutineScope创建对象
class Activity
private val mainScope = CoroutineScope(Dispatchers.Default) // use Default for test purposes
fun destroy()
mainScope.cancel()
fun doSomething()
// 在示例中启动了 10 个协程,且每个都工作了不同的时长
repeat(10) i ->
mainScope.launch
delay((i + 1) * 200L) // 延迟 200 毫秒、400 毫秒、600 毫秒等等不同的时间
println("Coroutine $i is done")
2、newCoroutineContext
返回到 代码一,分析CoroutineContext的是如何被添加进去,并返回一个CoroutineContext对象的
代码二
public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext
// 这个 + 号,在CoroutineContext中被重载了,以key value的形式保存
val combined = coroutineContext + context
val debug = if (DEBUG) combined + CoroutineId(COROUTINE_ID.incrementAndGet()) else combined
return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null)
//这里将一个协程分发器,加入到了CoroutineContext中。
debug + Dispatchers.Default else debug
Dispatchers.Default
创建的对象是 DefaultScheduler,它继承 ExperimentalCoroutineDispatcher ->继承 ExecutorCoroutineDispatcher ->继承 CoroutineDispatcher
CoroutineDispatcher 顾名思义,它对协程的执行进行分发
3、StandaloneCoroutine
返回到 代码一,分析StandaloneCoroutine对象,
代码三
private open class StandaloneCoroutine(
parentContext: CoroutineContext,
active: Boolean
) : AbstractCoroutine<Unit>(parentContext, active)
override fun handleJobException(exception: Throwable): Boolean
handleCoroutineException(context, exception)
return true
StandaloneCoroutine 继承了AbstractCoroutine,后者是一个重点,它实现了Job, Continuation<T>, CoroutineScope
这几个接口。说明它是一个协程,也可以是一个协程作用域,也可以作为一个job,成为CoroutineContext的一项
代码四
public abstract class AbstractCoroutine<in T>(
/**
* The context of the parent coroutine.
*/
@JvmField
protected val parentContext: CoroutineContext,
active: Boolean = true
) : JobSupport(active), Job, Continuation<T>, CoroutineScope
4、start
返回到 代码一,分析coroutine.start(start, coroutine, block)
执行到AbstractCoroutine 中的start
代码五
//R 类型是CoroutineScope T 类型是Unit
//参数receiver 是当前对象,也就是StandaloneCoroutine对象
//参数block 是从 launch的参数传递进来的,也就是示例代码中 launch后的lambda表达式
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T)
initParentJob()
//这里的receiver 和this 是同一个对象
//receiver 作为CoroutineScope 类型
//this 作为 Continuation 类型
start(block, receiver, this)
5、CoroutineStart # invoke
接着来到CoroutineStart的invoke 函数
代码六
public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>) =
when (this)
// 本例中执行这句,那我们就进入这个分析
CoroutineStart.DEFAULT -> block.startCoroutineCancellable(receiver, completion)
CoroutineStart.ATOMIC -> block.startCoroutine(receiver, completion)
CoroutineStart.UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
CoroutineStart.LAZY -> Unit // will start lazily
注意,这里的参数block 是示例代码中的一个匿名内部类的对象
6、 startCoroutineCancellable
代码七
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
runSafely(completion)
createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit))
下面依次分析 createCoroutineUnintercepted
、 intercepted
、 resumeCancellableWith
这三个函数
7、createCoroutineUnintercepted
代码八
public actual fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(
receiver: R,
completion: Continuation<T>
): Continuation<Unit>
val probeCompletion = probeCoroutineCreated(completion)
return if (this is BaseContinuationImpl)
//
create(receiver, probeCompletion)
else
createCoroutineFromSuspendFunction(probeCompletion)
(this as Function2<R, Continuation<T>, Any?>).invoke(receiver, it)
最终会调用到SuspendLambda
类中,示例代码中的匿名内部类继承了SuspendLambda
kotlin/coroutines/jvm/internal/ContinuationImpl.kt
代码九
internal abstract class SuspendLambda(
public override val arity: Int,
completion: Continuation<Any?>?
//调用了ContinuationImpl的一参构造函数,
//completion 是代码一中 创建的StandaloneCoroutine
) : ContinuationImpl(completion), FunctionBase<Any?>, SuspendFunction
constructor(arity: Int) : this(arity, null)
public override fun toString(): String =
if (completion == null)
Reflection.renderLambdaToString(this) // this is lambda
else
super.toString() // this is continuation
SuspendLambda 继承了ContinuationImpl,下面来看看ContinuationImpl
代码十
internal abstract class ContinuationImpl(
completion: Continuation<Any?>?,
private val _context: CoroutineContext?
) : BaseContinuationImpl(completion)
//会调用到这个构造函数,
//completion 是代码一中 创建的StandaloneCoroutine
//completion?.context 类型是协程上下文,这里是AbstractCoroutine类中的context
constructor(completion: Continuation<Any?>?) : this(completion, completion?.context)
public override val context: CoroutineContext
get() = _context!!
@Transient
private var intercepted: Continuation<Any?>? = null
// 代码七中 调用intercepted以上是关于Kotlin Coroutine 源码解析 —— 协程是如何运行的的主要内容,如果未能解决你的问题,请参考以下文章
Kotlin之协程coroutine lifecycleScope 和 viewModelScope源码
Kotlin 协程单元测试错误:线程“main @coroutine#1 @coroutine#2”中的异常 java.lang.NullPointerException