Kotlin 协程源码解析
Posted jackingzheng
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kotlin 协程源码解析相关的知识,希望对你有一定的参考价值。
本文适合有协程使用基础并想了解内部原理的开发者
项目地址:kotlinx.coroutines, 分析的版本: v1.3
本文结构
1、什么协程(Coroutines)
2、编译器实现协程的异步操作的同步调用
3、协程上下文切换
4、协程执行效率要高于线程/线程池
5、结构化并发原理
6、总结
一、什么协程(Coroutines)
看下Android文档对协程的定义:协程是一种并发设计模式,您可以在 android 平台上使用它来简化异步执行的代码。协程是在版本 1.3 中添加到 Kotlin 的,它基于来自其他语言的既定概念。
二、编译器实现协程的异步操作的同步调用。
2.1 CPS(Continuation Passing Style)
我们先看一个经典的网络请求并更新UI的例子
private val uiHandler = Handler(Looper.getMainLooper())
fun fetchDocs()
get("https://developer.android.com") result ->
show(result)
fun get(url: String, onSuccess: (String) -> Unit)
Thread
print("进行网络请求 get = $url")
onSuccess("Docs Content")
.start()
fun show(content: String)
uiHandler.post print("显示dialog $content")
从上面例子可以看到我们需要创建子线程进行网络请求,然后通过回调调用show(),show()里面uiHandler切换到UI线程更新UI。能看出我们通过回调来返回异步操作结果。
我们看看经过协程改造后的例子
fun fetchDocs()
CoroutineScope(Dispatchers.Main).launch // Dispatchers.Main
val result = get("https://developer.android.com") // Dispatchers.IO for `get`
show(result) // Dispatchers.Main
suspend fun get(url: String) = withContext(Dispatchers.IO)
print("进行网络请求 get = $url")
return@withContext "success"
fun show(content: String)
print("显示dialog $content")
我们看到get()的回调不见了,使用withContext(Dispatchers.IO)指定在IO线程执行,并同步方式的返回请求结果,show()将结果更新到UI上。
协程使用了什么魔法把异步的调用变成同步调用呢?
我们把上面代码反编译成Java代码,看看他施了什么魔法,代码如下
public final class CoroutineTest2Kt
public static final void fetchDocs()
BuildersKt.launch$default(CoroutineScopeKt.CoroutineScope(Dispatchers.getMain()), null, null, new Function2((Continuation)null)
private CoroutineScope p$;
Object L$0;
// 初始状态 label = 0
int label;
// 该调用在Dispatchers.Main
@Nullable
public final Object invokeSuspend(@NotNull Object $result)
Object var4 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
Object var10000;
CoroutineScope $this$launch;
switch(this.label)
case 0:
ResultKt.throwOnFailure($result);
$this$launch = this.p$;
this.L$0 = $this$launch;
// 将label切换为1
this.label = 1;
// 把本continuation传给get(),并挂起自身continuation
var10000 = CoroutineTest2Kt.get("https://developer.android.com", this);
if (var10000 == var4)
return var4;
break;
case 1:
$this$launch = (CoroutineScope)this.L$0;
ResultKt.throwOnFailure($result);
var10000 = $result;
break;
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
// resumeWhite()重新开始,继续执行,在UI显示
String result = (String)var10000;
CoroutineTest2Kt.show(result);
return Unit.INSTANCE;
@NotNull
public final Continuation create(@Nullable Object value, @NotNull Continuation completion)
Intrinsics.checkParameterIsNotNull(completion, "completion");
Function2 var3 = new <anonymous constructor>(completion);
var3.p$ = (CoroutineScope)value;
return var3;
public final Object invoke(Object var1, Continuation var2)
return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
), 3, (Object)null);
@Nullable
public static final Object get(@NotNull final String url, @NotNull Continuation $completion)
return BuildersKt.withContext((CoroutineContext)Dispatchers.getIO(), (Function2)(new Function2((Continuation)null)
private CoroutineScope p$;
int label;
// 该调用在Dispatchers.IO
@Nullable
public final Object invokeSuspend(@NotNull Object $result)
Object var5 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
switch(this.label)
case 0:
// 在IO中处理数据
ResultKt.throwOnFailure($result);
CoroutineScope $this$withContext = this.p$;
String var3 = "进行网络请求 get = " + url;
boolean var4 = false;
System.out.print(var3);
// 返回处理成功
return "success";
default:
throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
@NotNull
public final Continuation create(@Nullable Object value, @NotNull Continuation completion)
Intrinsics.checkParameterIsNotNull(completion, "completion");
Function2 var3 = new <anonymous constructor>(completion);
var3.p$ = (CoroutineScope)value;
return var3;
public final Object invoke(Object var1, Object var2)
return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
), $completion);
public static final void show(@NotNull String content)
Intrinsics.checkParameterIsNotNull(content, "content");
String var1 = "显示dialog " + content;
boolean var2 = false;
System.out.print(var1);
答案就是:CPS(Continuation Passing Style)技术,通过编译的手段,对suspend函数添加continuation参数,Continuation内部是状态机,通过label变量来实现Continuation内部的状态的切换,所以我们虽然写协程代码是同步方式,但编译器会把suspend代码转化成异步操作。这里我们只要知道编译器帮我们生成了Continuation,至于协程内部如何切换协程上下文的,如何实现协程的挂起和恢复的,我们继续往下看。
三、协程上下文切换
3.1 launch的源码分析
我们通过分析launch()方法来回答以下问题。
- Coroutine是如何切换上下文CoroutineContext?
- Coroutine的resumeWith和suspendInvoke如何实现挂起和恢复?
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job
// 创建新协程上下文
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyStandaloneCoroutine(newContext, block) else
// 创建标准协程
StandaloneCoroutine(newContext, active = true)
// 启动协程
coroutine.start(start, coroutine, block)
// 返回启动的协程
return coroutine
start(block, receiver, this) 传参中receiver对应launch中的StandaloneCoroutine,this对应StandaloneCoroutine
public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T)
initParentJob()
start(block, receiver, this)
public enum class CoroutineStart
DEFAULT,
LAZY,
ATOMIC,
public operator fun <T> invoke(block: suspend () -> T, completion: Continuation<T>): Unit =
when (this)
// 启动可取消的协程
DEFAULT -> block.startCoroutineCancellable(completion)
ATOMIC -> block.startCoroutine(completion)
UNDISPATCHED -> block.startCoroutineUndispatched(completion)
LAZY -> Unit // will start lazily
// 使用此函数以可取消的方式启动协程,以便在等待调度时可以取消协程。
internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(receiver: R, completion: Continuation<T>) =
runSafely(completion)
// 创建CoroutineUnintercepted
createCoroutineUnintercepted(receiver,completion).intercepted().resumeCancellableWith(Result.success(Unit))
createCoroutineUnintercepted()的实现
JVM createCoroutineUnintercepted实现
public actual fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(
receiver: R,
completion: Continuation<T>
): Continuation<Unit>
val probeCompletion = probeCoroutineCreated(completion)
return if (this is BaseContinuationImpl)
create(receiver, probeCompletion)
else
// 进入这个流程 -》创建协程suspend函数
createCoroutineFromSuspendFunction(probeCompletion)
(this as Function2<R, Continuation<T>, Any?>).invoke(receiver, it)
private inline fun <T> createCoroutineFromSuspendFunction(
completion: Continuation<T>,
crossinline block: (Continuation<T>) -> Any?
): Continuation<Unit>
val context = completion.context
// label == 0 when coroutine is not started yet (initially) or label == 1 when it was
return if (context === EmptyCoroutineContext)
// 省略
else
// 进入这个流程:创建一个ContinuationImpl单例
object : ContinuationImpl(completion as Continuation<Any?>, context)
private var label = 0
override fun invokeSuspend(result: Result<Any?>): Any? =
when (label)
0 ->
label = 1
result.getOrThrow() // Rethrow exception if trying to start with exception (will be caught by BaseContinuationImpl.resumeWith
block(this) // run the block, may return or suspend
1 ->
label = 2
result.getOrThrow() // this is the result if the block had suspended
else -> error("This coroutine had already completed")
intercepted()的实现
public actual fun <T> Continuation<T>.intercepted(): Continuation<T> =
// 调用ContinuationImpl的intercepted()
(this as? ContinuationImpl)?.intercepted() ?: this
ContinuationImpl.intercepted()的实现
调用interceptContinuation(this)获取该continuation包装所提供的continuation,从而截获所有恢复。
internal abstract class ContinuationImpl(
completion: Continuation<Any?>?,
private val _context: CoroutineContext?
) : BaseContinuationImpl(completion)
// ContinuationImpl的intercepted()实现如下,主要是
public fun intercepted(): Continuation<Any?> = intercepted ?:
// 从context找到ContinuationInterceptor,进行拦截
(context[ContinuationInterceptor]?.interceptContinuation(this) ?: this).also intercepted = it
我们看下interceptContinuation
public abstract class CoroutineDispatcher : AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor
// continuation 对应 ContinuationImpl,CoroutineDispatcher对应MainCoroutineDispatcher
public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> =
DispatchedContinuation(this, continuation)
interceptContinuation()会创建一个DispatchedContinuation
接着resumeCancellableWith()
public fun <T> Continuation<T>.resumeCancellableWith(result: Result<T>): Unit = when (this)
// 进入DispatchedContinuation这个分支
is DispatchedContinuation -> resumeCancellableWith(result)
else -> resumeWith(result)
我们看下resumeCancellableWith()的实现
inline fun resumeCancellableWith(result: Result<T>)
val state = result.toState()
// 先isDispatchNeeded()判断是否进行分发
if (dispatcher.isDispatchNeeded(context))
_state = state
resumeMode = MODE_CANCELLABLE
// 传人runnable给dispatch进行处理
dispatcher.dispatch(context, this)
else
executeUnconfined(state, MODE_CANCELLABLE)
if (!resumeCancelled())
resumeUndispatchedWith(result)
dispatcher执行的Runable的内容是
// DispatchedTask
public final override fun run()
val taskContext = this.taskContext
var fatalException: Throwable? = null
try
val delegate = delegate as DispatchedContinuation<T>
val continuation = delegate.continuation
val context = continuation.context
// 获取当前状态
val state = takeState()
withCoroutineContext(context, delegate.countOrElement)
// 测查状态是否Exception
val exception = getExceptionalResult(state)
// 获取context的job
val job = if (resumeMode.isCancellableMode) context[Job] else null
if (exception == null && job != null && !job.isActive)
// 进行处理job
val cause = job.getCancellationException()
// 如果是可取消job,则进行取消
cancelResult(state, cause)
// continuation会调用resumeWith()
continuation.resumeWithStackTrace(cause)
else
if (exception != null) continuation.resumeWithException(exception)
//
else continuation.resume(getSuccessfulResult(state))
以上是关于Kotlin 协程源码解析的主要内容,如果未能解决你的问题,请参考以下文章
Kotlin Coroutine 源码解析 —— 协程是如何运行的