直面底层:“吹上天”的协程,带你深入源码分析
Posted 鸿洋
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了直面底层:“吹上天”的协程,带你深入源码分析相关的知识,希望对你有一定的参考价值。
本文作者
链接:
https://juejin.im/post/5eea757a51882565ae5cb287
本文由作者授权发布。
对于协程的实战,这里还有一篇:
「直面底层系列」其他相关文章:
这个系列会相对源码解析多一些,大家做好攻克的准备。
本篇文章讲解的内容是在android中使用协程。
在说协程之前,我先说下线程和线程池:
线程是操作系统的内核资源,是CPU调度的最小单位,所有的应用程序都运行在线程上,它是我们实现并发和异步的基础。在Java的API中,Thread是实现线程的基础类,每创建一个Thread对象,操作系统内核就会启动一个线程,在Thread的源码中,它的内部实现是大量的JNI调用,因为线程的实现必须由操作系统提供直接支持,在Linux操作系统中,每一个Java thread对应一个native thread,它们是一一对应的,在Android中,创建Thread的过程中都会调用Linux API中的pthread_create函数。
线程的调用会有存在以下问题:
线程不是轻量级资源,大量创建线程会消耗系统大量资源,传统的阻塞调用会导致系统存在大量的因为阻塞而不能运行的线程,这很浪费系统资源。
线程阻塞状态和运行状态的切换会存在相当大的开销,一直以来都是个优化点,例如:JVM在运行时会对锁进行优化,就像自旋锁、锁粗化和锁消除等等。
线程池(Thread Pool)是一种基于池化思想管理线程的工具,使用线程池有如下好处:
降低资源消耗:通过池化技术重复利用已创建的线程,降低线程的创建和销毁的损耗。
提高响应速度:任务到达时,无需等待线程创建即可立即执行。
提高线程的可管理性:线程是稀缺资源,如果无限制创建,不仅会消耗系统资源,还会因为线程的不合理分布导致资源调度失衡,降低系统的稳定性,使用线程池可以进行统一的分配、调优和监控。
提供更多更强大的功能:线程池具备可拓展性,允许开发人员向其中增加更多的功能。
那协程与线程有什么关系呢?在Java中,协程是基于线程池的API,它并没有脱离Java或者Kotlin已经有的东西。
协程的定义
协程源自Simula和Modula-2语言,它是一种编程思想,并不局限于特定的语言,在1958年的时候,Melvin Edward Conway提出这个术语并用于构建汇编程序。在Android中使用它可以简化异步执行的代码,它是在版本1.3中添加到Kotlin。
下面来介绍如何使用协程:
依赖
要使用协程,需要在build.gradle文件中添加如下依赖:
项目根目录的build.gradle文件:
// build.gradle(AndroidGenericFramework)
ext {
// 省略部分代码
kotlinxCoroutinesVersion = '1.3.1'
// 省略部分代码
}
module的build.gradle文件:
// build.gradle(:app)
dependencies {
// 省略部分代码
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:$kotlinxCoroutinesVersion"
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:$kotlinxCoroutinesVersion"
// 省略部分代码
testImplementation "org.jetbrains.kotlinx:kotlinx-coroutines-test:$kotlinxCoroutinesVersion"
// 省略部分代码
}
org.jetbrains.kotlinx:kotlinx-coroutines-core:协程的核心库,它是协程的公共API,有了这一层公共代码才能使协程在各个平台的接口得到统一。
org.jetbrains.kotlinx:kotlinx-coroutines-android:协程的当前平台对应的平台库,当前平台是Android,它是协程在具体平台的具体实现,因为类似多线程在各个平台的实现方式是有差异的。
org.jetbrains.kotlinx:kotlinx-coroutines-test:协程的测试库,它方便我们在测试中使用协程。
这里要注意的是,这三个库的版本要保持一致。
下面是协程的基础部分:
启动协程
可以通过以下两种方式来启动协程:
launch:可以启动新协程,但是不将结果返回给调用方。
async:可以启动新协程,并且允许使用await暂停函数返回结果。
通常我们使用launch函数从常规函数启动新协程,如果要执行并行分解的话才使用async函数。
async函数可以返回结果,代码如下所示:
// Builders.common.kt
public fun <T> CoroutineScope.async(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> T
): Deferred<T> {
val newContext = newCoroutineContext(context)
val coroutine = if (start.isLazy)
LazyDeferredCoroutine(newContext, block) else
DeferredCoroutine<T>(newContext, active = true)
coroutine.start(start, coroutine, block)
return coroutine
}
async函数返回的是Deferred接口,继承Job接口,它是一个非阻塞、可取消的future。
要注意的是launch函数和async函数以不同的方式处理异常,在使用async函数时候可以调用await函数得到结果,如果出现异常将会以静默方式抛出,也就是说不会出现在崩溃指标中,也不会在logcat中注明。
await函数是针对单个协程的,awaitAll函数是针对多个协程的,它们都能保证这些协程在返回结果之前完成。
通常协程有三种方式创建,如下所示:
runBlocking
使用runBlocking顶层函数来创建协程,这种方式是线程阻塞的,适用于单元测试,一般业务开发不会使用这种,示例代码如下所示:
runBlocking {
login()
}
runBlocking函数源码如下所示:
// Builders.kt
@Throws(InterruptedException::class)
// 第一个参数context是协程上下文,默认值为EmptyCoroutineContext,第二个参数是带有CoroutineScope接受者对象,不接受任何参数返回T的Lambda表达式
public fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T {
val currentThread = Thread.currentThread()
val contextInterceptor = context[ContinuationInterceptor]
val eventLoop: EventLoop?
val newContext: CoroutineContext
if (contextInterceptor == null) {
// 如果没有指定调度程序(dispatcher),就创建或者使用私有事件循环
eventLoop = ThreadLocalEventLoop.eventLoop
newContext = GlobalScope.newCoroutineContext(context + eventLoop)
} else {
// 看看上下文(context)的拦截器(interceptor)是否是一个我们将要使用的事件循环(用来支持TestContext)或者如果存在thread-local事件循环,就使用它来避免阻塞,不过它不会去新建一个
eventLoop = (contextInterceptor as? EventLoop)?.takeIf { it.shouldBeProcessedFromContext() }
?: ThreadLocalEventLoop.currentOrNull()
newContext = GlobalScope.newCoroutineContext(context)
}
val coroutine = BlockingCoroutine<T>(newContext, currentThread, eventLoop)
coroutine.start(CoroutineStart.DEFAULT, coroutine, block)
return coroutine.joinBlocking()
}
GlobalScope
使用GlobalScope单例对象,并且调用launch函数来创建协程,这种方式不会阻塞线程,但是不推荐在Android中使用这种方式,因为它的生命周期是整个应用程序的生命周期,如果处理不好,容易导致内存泄漏,而且不能取消,示例代码如下所示:
GlobalScope.launch {
login()
}
GlobalScope源码如下所示:
// CoroutineScope.kt
public object GlobalScope : CoroutineScope {
/**
* Returns [EmptyCoroutineContext].
*/
override val coroutineContext: CoroutineContext
get() = EmptyCoroutineContext
}
EmptyCoroutineContext是一个空的协程上下文。
CoroutineScope
使用CoroutineScope对象,并且调用launch函数来创建协程,这种方式可以通过传入的CoroutineContext来控制协程的生命周期,推荐使用这种方式,示例代码如下所示:
CoroutineScope(Dispatchers.IO).launch {
login()
}
Dispatchers.IO是CoroutineContext其中一种类型,下面会讲到这个。
CoroutineScope可以管理一个或者多个相关的协程,可以使用它在指定范围内启动新协程。
与调度程序不同,CoroutineScope不运行协程。
CoroutineScope的一项重要功能就是在用户离开你应用中的内容区域时停止执行协程,它可以确保所有正在运行的操作都能正确停止。
CoroutineScope源码如下所示:
// CoroutineScope.kt
// 参数block是带有CoroutineScope接受者对象,不接受任何参数返回R的Lambda表达式
public suspend fun <R> coroutineScope(block: suspend CoroutineScope.() -> R): R =
suspendCoroutineUninterceptedOrReturn { uCont ->
val coroutine = ScopeCoroutine(uCont.context, uCont)
coroutine.startUndispatchedOrReturn(coroutine, block)
}
在Android平台上,协程有助于解决两个主要问题:
管理长时间运行的任务,如果管理不当,这些任务可能会阻塞主线程并导致你的应用界面冻结。
提供主线程安全性,或者从主线程安全地调用网络或者磁盘操作。
1. 管理长时间运行的任务
在Android平台上,每个应用都有一个用于处理界面并且管理用户交互的主线程。如果你的应用为主线程分配的工作太多,会导致界面呈现速度缓慢或者界面冻结,对触摸事件的响应速度很慢,例如:网络请求、JSON解析、写入或者读取数据库、遍历大型列表,这些都应该在工作线程完成。
协程在常规函数的基础上添加了两项操作,用于处理长时间运行的任务。在invoke或者call和return之外,协程添加了suspend和resume:
suspend用于暂停执行当前协程,并保存所有的局部变量。
resume用于让已暂停的协程从其暂停处继续执行。
要调用suspend函数,只能从其他suspend函数进行调用,或者通过使用协程构建器(例如:launch)来启动新的协程。
Kotlin使用堆栈帧来管理要运行哪个函数以及所有的局部变量。暂停协程时会复制并保存当前的堆栈帧以供稍后使用;恢复协程时会将堆栈帧从其保存位置复制回来,然后函数再次开始运行。
编译器会在编译期间对被suspend修饰符修饰的函数进行续体传递风格(CPS)变换,它会改变suspend函数的函数签名,我举个例子:
await函数是个suspend函数,函数签名如下所示:
suspend fun <T> CompletableFuture<T>.await(): T
在编译期间进行**续体传递风格(CPS)**变换后:
fun <T> CompletableFuture<T>.await(continuation: Continuation<T>): Any?
我们可以看到进行续体传递风格(CPS)变换后的函数多了一个类型为Continuation的参数,Continuation代码如下所示:
interface Continuation<in T> {
val context: CoroutineContext
fun resumeWith(result: Result<T>)
}
续体包装了协程在挂起之后继续执行的代码,在编译过程中,一个完整的协程被分割成一个又一个续体,在await函数的挂起结束之后,它会调用参数continuation的resumeWith函数来恢复执行await之后的代码。
进行续体传递风格(CPS)变换后的函数返回值是Any?,这是因为这个函数发生变换后,它会返回一个类型为T(返回它本身)和COROUTINE_SUSPENDED标记的联合类型,因为Kotlin没有联合类型语法,所以就使用最泛化的类型Any?来表示,COROUTINE_SUSPENDED标记表示的是这个suspend函数会发生事实上的挂起操作。
在下面介绍的三个调度程序,它们都会继承CoroutineDispatcher类,源码如下所示:
// CorountineDispatcher.kt
public abstract class CoroutineDispatcher :
AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor {
// 省略部分代码
}
这个类实现了ContinuationInterceptor接口,源码如下所示:
// ContinuationInterceptor.kt
@SinceKotlin("1.3")
public interface ContinuationInterceptor : CoroutineContext.Element {
// 定义上下文拦截器的键
companion object Key : CoroutineContext.Key<ContinuationInterceptor>
// 返回原始封装的Continuation,从而拦截所有的恢复
public fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T>
// 初始化时,为interceptContinuation返回的Continuation实例调用
public fun releaseInterceptedContinuation(continuation: Continuation<*>) {
/* do nothing by default */
}
public override operator fun <E : CoroutineContext.Element> get(key: CoroutineContext.Key<E>): E? {
// getPolymorphicKey专门用于ContinuationInterceptor的键
@OptIn(ExperimentalStdlibApi::class)
if (key is AbstractCoroutineContextKey<*, *>) {
@Suppress("UNCHECKED_CAST")
return if (key.isSubKey(this.key)) key.tryCast(this) as? E else null
}
@Suppress("UNCHECKED_CAST")
return if (ContinuationInterceptor === key) this as E else null
}
public override fun minusKey(key: CoroutineContext.Key<*>): CoroutineContext {
// minusPolymorphicKey专门用于ContinuationInterceptor的键
@OptIn(ExperimentalStdlibApi::class)
if (key is AbstractCoroutineContextKey<*, *>) {
return if (key.isSubKey(this.key) && key.tryCast(this) != null) EmptyCoroutineContext else this
}
return if (ContinuationInterceptor === key) EmptyCoroutineContext else this
}
}
这个接口叫做续体拦截器,它负责拦截协程在恢复后执行的代码(也就是续体),并且将其在指定线程或者线程池恢复。
在编译期间,每个suspend函数会被编译成实现了Continuation接口的匿名类,其实调用suspend函数时,并不一定会挂起协程,举个例子:有个网络请求的逻辑调用了await函数,如果网络请求还没得到结果,那么协程就会被挂起,直到得到结果为止,续体拦截器只会拦截发生挂起后的挂起点的续体,而对于没发生挂起的挂起点,协程会调用resumeWith函数,而不再需要续体拦截器处理。
续体拦截器会缓存拦截过的续体,并且在不需要它的时候调用releaseInterceptedContinuation函数释放。
2. 使用协程确保主线程安全
Kotlin协程使用调度程序来确定哪些线程用于执行协程,所有协程都必须在调度程序中运行,协程可以可以暂停,而调度程序负责将其恢复。
Kotlin提供了三个调度程序,可以使用它们来指定应在何处运行协程:
Dispatchers.Main:使用此调度程序可在Android主线程上运行协程,只能用于界面交互和执行快速工作,例如:调用suspend函数、运行Android界面框架操作和更新LiveData对象。
Dispatchers.Default:此调度程序适合在主线程之外执行占用大量CPU资源的工作,例如:对列表排序和解析JSON。
Dispatchers.IO:此调度程序适合在主线程之外执行磁盘或者网络I/O,例如:操作数据库(使用Room)、向文件中写入数据或者从文件中读取数据和运行任何网络操作。
我们可以调用withContext函数,并且传入相应的协程上下文(CoroutineContext)就可以调度线程。
withContext函数是个suspend函数,它可以在不引用回调的情况下控制任何代码行的线程池,因此可以将其应用于非常小的函数,示例代码如下所示:
suspend fun getUserInfo() { // Dispatchers.Main
val data = fetchUserInfo() // Dispatchers.Main
show(data) // Dispatchers.Main
}
suspend fun fetchUserInfo() { // Dispatchers.Main
withContext(Dispatchers.IO) { // Dispatchers.IO
// 执行网络请求 // Dispatchers.IO
} // Dispatchers.Main
}
在示例代码中,getUserInfo函数在主线程上执行,它可以安全地调用fetchUserInfo函数,在工作线程中执行网络请求,并且挂起,在withContext代码块执行完成后,主线程上的协程就会根据fetchUserInfo函数的结果恢复后面的逻辑。
相比于回调实现,使用withContext函数不会增加额外的开销,在某些情况下,甚至优于回调实现,例如:某个函数执行了很多次的网络请求,使用外部withContext函数可以让Kotlin停留在同一个调度程序,并且只切换一次线程,此外,Kotlin还优化了Dispatchers.Default和Dispatchers.IO之间的切换,以尽可能避免线程切换。
要注意的是,Dispatchers.Default和Dispatchers.IO都是使用线程池的调度程序,它们不能保证代码块在同一线程从上到下执行,因为在某些情况下,Kotlin会在挂起和恢复后,将执行工作移交给另外一个线程,这意味着,对于整个withContext代码块,线程局部变量并不指向同一个值。
Dispatchers.Main
源码如下所示:
// Dispatchers.kt
public actual object Dispatchers {
// 省略部分代码
@JvmStatic
public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher
// 省略部分代码
}
然后看下MainDispatcherLoader.dispatcher,源码如下所示:
// MainDispatchers.kt
internal object MainDispatcherLoader {
// 省略部分代码
@JvmField
val dispatcher: MainCoroutineDispatcher = loadMainDispatcher()
// 省略部分代码
}
变量dispatcher为MainCoroutineDispatcher类型,MainCoroutineDispatcher是个抽象类,然后它的其中一个实现类是包装类(sealed class)HandlerDispatcher,也就是它的子类肯定是在这个类的所在的文件中,然后我找到HandlerContext这个类,这个类是HandlerDispatcher的唯一子类,源码如下所示:
// MainCoroutineDispatcher.kt
internal class HandlerContext private constructor(
private val handler: Handler,
private val name: String?,
private val invokeImmediately: Boolean
) : HandlerDispatcher(), Delay {
// HandlerContext的构造函数,参数handler为要传进来的Handler,参数name为用于调试的可选名称
public constructor(
handler: Handler,
name: String? = null
) : this(handler, name, false)
@Volatile
private var _immediate: HandlerContext? = if (invokeImmediately) this else null
override val immediate: HandlerContext = _immediate ?:
HandlerContext(handler, name, true).also { _immediate = it }
// 判断是否需要调度,参数context为CoroutineContext
override fun isDispatchNeeded(context: CoroutineContext): Boolean {
// 判断invokeImmediately的值或者是否是同一个线程
return !invokeImmediately || Looper.myLooper() != handler.looper
}
// 调度线程,参数context为CoroutineContext,参数block为Runnable
override fun dispatch(context: CoroutineContext, block: Runnable) {
// 调用Handler的post方法,将Runnable添加到消息队列中,这个Runnable会在这个Handler附加在线程上的时候运行
handler.post(block)
}
override fun scheduleResumeAfterDelay(timeMillis: Long, continuation: CancellableContinuation<Unit>) {
val block = Runnable {
with(continuation) { resumeUndispatched(Unit) }
}
// 调用Handler的postDelayed方法,将Runnable添加到消息队列中,并且在指定的时间结束后运行
handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY))
continuation.invokeOnCancellation { handler.removeCallbacks(block) }
}
override fun invokeOnTimeout(timeMillis: Long, block: Runnable): DisposableHandle {
// 调用Handler的postDelayed方法,将Runnable添加到消息队列中,并且在指定的时间结束后运行
handler.postDelayed(block, timeMillis.coerceAtMost(MAX_DELAY))
return object : DisposableHandle {
override fun dispose() {
// 调用Handler的removeCallbacks方法,删除消息队列中的Runnable
handler.removeCallbacks(block)
}
}
}
override fun toString(): String =
if (name != null) {
if (invokeImmediately) "$name [immediate]" else name
} else {
handler.toString()
}
override fun equals(other: Any?): Boolean = other is HandlerContext && other.handler === handler
override fun hashCode(): Int = System.identityHashCode(handler)
}
然后我们找下调用HandlerContext的构造函数的地方,源码如下所示:
// HandlerDispatcher.kt
@JvmField
@Deprecated("Use Dispatchers.Main instead", level = DeprecationLevel.HIDDEN)
internal val Main: HandlerDispatcher? = runCatching { HandlerContext(Looper.getMainLooper().asHandler(async = true), "Main") }.getOrNull()
可以看到传入了Looper.getMainLooper方法,也就是应用程序的主循环程序(Main Looper),它位于应用程序的主线程中。
可以看到使用了很多Handler相关的方法,也就是它还是依赖于Android的消息机制。
Dispatchers.Default
源码如下所示:
// Dispatchers.kt
public actual object Dispatchers {
// 省略部分代码
@JvmStatic
public actual val Default: CoroutineDispatcher = createDefaultDispatcher()
// 省略部分代码
}
然后看下createDefaultDispatcher函数,源码如下所示:
// CoroutineContext.kt
internal actual fun createDefaultDispatcher(): CoroutineDispatcher =
if (useCoroutinesScheduler) DefaultScheduler else CommonPool
这里会根据内部变量(internal val)useCoroutinesScheduler判断返回是DefaultScheduler还是CommonPool,useCoroutinesScheduler源码如下所示:
// CoroutineContext.kt
internal const val COROUTINES_SCHEDULER_PROPERTY_NAME = "kotlinx.coroutines.scheduler"
internal val useCoroutinesScheduler = systemProp(COROUTINES_SCHEDULER_PROPERTY_NAME).let { value ->
when (value) {
null, "", "on" -> true
"off" -> false
else -> error("System property '$COROUTINES_SCHEDULER_PROPERTY_NAME' has unrecognized value '$value'")
}
}
这个内部变量(internal val)useCoroutinesScheduler是根据JVM的System.getProperty方法获取的,通过传入 kotlinx.coroutines.scheduler"作为键(key),返回的值为on,useCoroutinesScheduler为true;返回的值是off,useCoroutinesScheduler为false。
先看下DefaultScheduler这种情况,源码如下所示:
// Dispatcher.kt
internal object DefaultScheduler : ExperimentalCoroutineDispatcher() {
val IO = blocking(systemProp(IO_PARALLELISM_PROPERTY_NAME, 64.coerceAtLeast(AVAILABLE_PROCESSORS)))
override fun close() {
throw UnsupportedOperationException("$DEFAULT_SCHEDULER_NAME cannot be closed")
}
override fun toString(): String = DEFAULT_SCHEDULER_NAME
@InternalCoroutinesApi
@Suppress("UNUSED")
public fun toDebugString(): String = super.toString()
}
它继承ExperimentalCoroutineDispatcher类,它是个不稳定的类,以后可能会改变,可以看下这个类的dispatch函数,这个函数负责调度线程,源码如下所示:
// Dispatcher.kt
@InternalCoroutinesApi
open class ExperimentalCoroutineDispatcher(
// 核心线程数
private val corePoolSize: Int,
// 最大线程数
private val maxPoolSize: Int,
// 调度器保持存活的时间(单位:纳秒)
private val idleWorkerKeepAliveNs: Long,
// 调度器名字
private val schedulerName: String = "CoroutineScheduler"
) : ExecutorCoroutineDispatcher() {
constructor(
corePoolSize: Int = CORE_POOL_SIZE,
maxPoolSize: Int = MAX_POOL_SIZE,
schedulerName: String = DEFAULT_SCHEDULER_NAME
) : this(corePoolSize, maxPoolSize, IDLE_WORKER_KEEP_ALIVE_NS, schedulerName)
@Deprecated(message = "Binary compatibility for Ktor 1.0-beta", level = DeprecationLevel.HIDDEN)
constructor(
corePoolSize: Int = CORE_POOL_SIZE,
maxPoolSize: Int = MAX_POOL_SIZE
) : this(corePoolSize, maxPoolSize, IDLE_WORKER_KEEP_ALIVE_NS)
// 省略部分代码
override fun dispatch(context: CoroutineContext, block: Runnable): Unit =
try {
// 调用了coroutineScheduler的dispatch函数
coroutineScheduler.dispatch(block)
} catch (e: RejectedExecutionException) {
DefaultExecutor.dispatch(context, block)
}
// 省略部分代码
}
看下CoroutineScheduler这个类,然后再看下它的dispatch函数,源码如下所示:
// CoroutineScheduler.kt
@Suppress("NOTHING_TO_INLINE")
internal class CoroutineScheduler(
// 核心线程数
@JvmField val corePoolSize: Int,
// 最大线程数
@JvmField val maxPoolSize: Int,
// 调度器保持存活的时间(单位:纳秒)
@JvmField val idleWorkerKeepAliveNs: Long = IDLE_WORKER_KEEP_ALIVE_NS,
// 调度器名字
@JvmField val schedulerName: String = DEFAULT_SCHEDULER_NAME
) : Executor, Closeable {
init {
require(corePoolSize >= MIN_SUPPORTED_POOL_SIZE) {
"Core pool size $corePoolSize should be at least $MIN_SUPPORTED_POOL_SIZE"
}
require(maxPoolSize >= corePoolSize) {
"Max pool size $maxPoolSize should be greater than or equals to core pool size $corePoolSize"
}
require(maxPoolSize <= MAX_SUPPORTED_POOL_SIZE) {
"Max pool size $maxPoolSize should not exceed maximal supported number of threads $MAX_SUPPORTED_POOL_SIZE"
}
require(idleWorkerKeepAliveNs > 0) {
"Idle worker keep alive time $idleWorkerKeepAliveNs must be positive"
}
}
// 省略部分代码
fun dispatch(block: Runnable, taskContext: TaskContext = NonBlockingContext, tailDispatch: Boolean = false) {
// 用于支持虚拟时间
trackTask()
val task = createTask(block, taskContext)
// 尝试将任务提交到本地队列,并且根据结果采取执行相关的逻辑
val currentWorker = currentWorker()
val notAdded = currentWorker.submitToLocalQueue(task, tailDispatch)
if (notAdded != null) {
if (!addToGlobalQueue(notAdded)) {
// 全局队列在最后一步关闭时不应该接受更多的任务
throw RejectedExecutionException("$schedulerName was terminated")
}
}
val skipUnpark = tailDispatch && currentWorker != null
if (task.mode == TASK_NON_BLOCKING) {
if (skipUnpark) return
// 执行任务
signalCpuWork()
} else {
// 增加阻塞任务
signalBlockingWork(skipUnpark = skipUnpark)
}
}
// 省略部分代码
}
可以看到CoroutineScheduler实现了Executor接口,在Java中线程池的核心实现类是ThreadPoolExecutor类,它也是实现了Executor接口,所以这个CoroutineScheduler是协程中线程池的一种实现。
corePoolSize是核心线程数量,它是通过调用JVM的Runtime.getRuntime().availableProcessors()方法得到当前处理器可运行的线程数,它的缺省值强制设置为至少两个线程。
maxPoolSize是最大线程数量,最小值为corePoolSize,最大值为(1 shl BLOCKING_SHIFT) - 2,BLOCKING_SHIFT为21,也就是1向左位移21位再减去2,确保Runtime.getRuntime().availableProcessors()得到的值再乘以2在最小值和最大值之间。
这个函数做的事情就是将传入的任务压入任务栈,然后调用signalCpuWork执行任务或者调用signalBlockingWork来增加阻塞任务。
然后再看下另外一种情况:CommonPool,源码如下所示:
// CommonPool.kt
internal object CommonPool : ExecutorCoroutineDispatcher() {
// 省略部分代码
private fun createPool(): ExecutorService {
if (System.getSecurityManager() != null) return createPlainPool()
// ForkJoinPool类的反射,方便它在JDK6上可以运行(这里没有),如果没有就使用普通线程池
val fjpClass = Try { Class.forName("java.util.concurrent.ForkJoinPool") }
?: return createPlainPool()
// 尝试使用commonPool,除非显式指定了并行性或者在调试privatePool模式
if (!usePrivatePool && requestedParallelism < 0) {
Try { fjpClass.getMethod("commonPool")?.invoke(null) as? ExecutorService }
?.takeIf { isGoodCommonPool(fjpClass, it) }
?.let { return it }
}
// 尝试创建私有ForkJoinPool实例
Try { fjpClass.getConstructor(Int::class.java).newInstance(parallelism) as? ExecutorService }
?. let { return it }
// 使用普通线城市
return createPlainPool()
}
// 省略部分代码
// 创建普通线程池
private fun createPlainPool(): ExecutorService {
val threadId = AtomicInteger()
// 使用Java的newFixedThreadPool线程池
return Executors.newFixedThreadPool(parallelism) {
Thread(it, "CommonPool-worker-${threadId.incrementAndGet()}").apply { isDaemon = true }
}
}
// 省略部分代码
// 调度线程
override fun dispatch(context: CoroutineContext, block: Runnable) {
try {
(pool ?: getOrCreatePoolSync()).execute(wrapTask(block))
} catch (e: RejectedExecutionException) {
unTrackTask()
DefaultExecutor.enqueue(block)
}
}
// 省略部分代码
}
可以看到使用CommonPool,其实就是使用Java的newFixedThreadPool线程池。
Dispatchers.Default调度器的核心线程池和处理器的线程数是相等的,因此它可以用于处理密集型计算,适合在主线程之外执行占用大量CPU资源的工作,例如:对列表排序和解析JSON,和RxJava的计算线程池的思想有点类似。
Dispatchers.IO
源码如下所示:
// Dispatchers.kt
public actual object Dispatchers {
// 省略部分代码
@JvmStatic
public val IO: CoroutineDispatcher = DefaultScheduler.IO
}
可以看到IO其实是DefaultScheduler的一个成员变量,源码如下所示:
internal object DefaultScheduler : ExperimentalCoroutineDispatcher() {
// 调用了父类ExperimentalCoroutineDispatcher的blocking函数
val IO = blocking(systemProp(IO_PARALLELISM_PROPERTY_NAME, 64.coerceAtLeast(AVAILABLE_PROCESSORS)))
override fun close() {
throw UnsupportedOperationException("$DEFAULT_SCHEDULER_NAME cannot be closed")
}
override fun toString(): String = DEFAULT_SCHEDULER_NAME
@InternalCoroutinesApi
@Suppress("UNUSED")
public fun toDebugString(): String = super.toString()
}
可以看下它的父类ExperimentalCoroutineDispatcher的blocking函数,源码如下所示:
// Dispatcher.kt
@InternalCoroutinesApi
open class ExperimentalCoroutineDispatcher(
// 核心线程数
private val corePoolSize: Int,
// 最大线程数
private val maxPoolSize: Int,
// 调度器保持存活的时间(单位:纳秒)
private val idleWorkerKeepAliveNs: Long,
// 调度器名字
private val schedulerName: String = "CoroutineScheduler"
) : ExecutorCoroutineDispatcher() {
constructor(
corePoolSize: Int = CORE_POOL_SIZE,
maxPoolSize: Int = MAX_POOL_SIZE,
schedulerName: String = DEFAULT_SCHEDULER_NAME
) : this(corePoolSize, maxPoolSize, IDLE_WORKER_KEEP_ALIVE_NS, schedulerName)
@Deprecated(message = "Binary compatibility for Ktor 1.0-beta", level = DeprecationLevel.HIDDEN)
constructor(
corePoolSize: Int = CORE_POOL_SIZE,
maxPoolSize: Int = MAX_POOL_SIZE
) : this(corePoolSize, maxPoolSize, IDLE_WORKER_KEEP_ALIVE_NS)
// 省略部分代码
public fun blocking(parallelism: Int = BLOCKING_DEFAULT_PARALLELISM): CoroutineDispatcher {
require(parallelism > 0) { "Expected positive parallelism level, but have $parallelism" }
// 创建LimitingDispatcher对象
return LimitingDispatcher(this, parallelism, TASK_PROBABLY_BLOCKING)
}
// 省略部分代码
}
看下LimitingDispatcher类,源码如下所示:
// Dispatcher.kt
private class LimitingDispatcher(
// final变量dispatcher为ExperimentalCoroutineDispatcher类型
val dispatcher: ExperimentalCoroutineDispatcher,
val parallelism: Int,
override val taskMode: Int
) : ExecutorCoroutineDispatcher(), TaskContext, Executor {
// 省略部分代码
// 调度线程,调用dispatch(block: Runnable, tailDispatch: Boolean)函数
override fun dispatch(context: CoroutineContext, block: Runnable) = dispatch(block, false)
private fun dispatch(block: Runnable, tailDispatch: Boolean) {
var taskToSchedule = block
while (true) {
// 提交正在执行的任务槽
val inFlight = inFlightTasks.incrementAndGet()
// 快速路径,如果没有达到并行性限制,就会分派任务并且返回
if (inFlight <= parallelism) {
// 调用ExperimentalCoroutineDispatcher的dispatchWithContext函数
dispatcher.dispatchWithContext(taskToSchedule, this, tailDispatch)
return
}
// 达到并行性限制后就将任务添加到队列中
queue.add(taskToSchedule)
if (inFlightTasks.decrementAndGet() >= parallelism) {
return
}
taskToSchedule = queue.poll() ?: return
}
}
// 省略部分代码
}
可以看到其实Dispatchers.Default调度器和Dispatchers.IO调度器是共用同一个线程池的。
3. 指定CoroutineScope
在定义协程时,必须指定其CoroutineScope,CoroutineScope可以管理一个或者多个相关的协程,可以使用它在指定范围内启动新协程。
与调度程序不同,CoroutineScope不运行协程。
CoroutineScope的一项重要功能就是在用户离开应用中内容区域时停止执行协程,可以确保所有正在运行的操作都能正确停止。
在Android平台上,可以将CoroutineScope实现与组件中生命周期相关联,例如:Lifecycle和ViewModel,这样可以避免内存泄漏和不再对与用户相关的Activity或者Fragment执行额外的工作,例如:ViewModelScope、LifecycleScope和liveData。
添加KTX依赖项
对于ViewModelScope,请使用androidx.lifecycle:lifecycle-viewmodel-ktx:2.1.0-beta01或更高版本。
对于LifecycleScope,请使用androidx.lifecycle:lifecycle-runtime-ktx:2.2.0-alpha01或更高版本。
对于liveData,请使用androidx.lifecycle:lifecycle-livedata-ktx:2.2.0-alpha01或更高版本。
生命周期感知型CoroutineScope
ViewModelScope
为ViewModel定义ViewModelScope,如果ViewModel已清除,那么在这个范围内的启动的协程就会自动取消,如果你的工作需要在ViewModel处于活动状态下才能完成的话,可以使用它,示例代码如下所示:
class MyViewModel : ViewModel() {
init {
viewModelScope.launch {
// 当ViewModel被清除,这个范围内启动的协程就会自动取消
}
}
}
LifecycleScope
为每个Lifecycle对象定义LifecycleScope,在这个范围内启动的协程会在Lifecycle被销毁的时候自动取消,可以通过lifecycle.coroutineScope或者lifecycleOwner.lifecycleScope属性访问Lifecycle的CoroutineScope,示例代码如下所示:
class MyFragment : Fragment() {
override fun onViewCreated(view: View, savedInstanceState: Bundle?) {
super.onViewCreated(view, savedInstanceState)
viewLifecycleOwner.lifecycleScope.launch {
// 在这个范围内启动的协程会在Lifecycle被销毁的时候自动取消
}
}
}
即使CoroutineScope提供了适当的方法来自动取消长时间运行的操作,在某些情况下,可能需要暂停执行代码块。
例如:要使用FragmentTransaction,Fragment的生命周期至少处于STARTED状态,对于这种情况,Lifecycle提供了这些方法:lifecycle.whenCreated、lifecycle.whenStarted和lifecycle.whenResumed,如果Lifecycle未至少处于所需的最低状态,那么就会暂停在这些代码块内运行的任何协程,示例代码如下所示:
class MyFragment : Fragment() {
init {
// 在Fragment的构造函数可以安全地启动
lifecycleScope.launch {
whenCreateed {
// 只有当Fragment的生命周期至少处于CREATED状态下,这个代码块才会执行,并且可以调用其他suspend函数
}
whenStarted {
// 只有当Fragment的生命周期至少处于STARTED状态下,这个代码块才会执行,并且可以调用其他suspend函数
}
whenResumed {
// 只有当Fragment的生命周期至少处于RESUMED状态下,这个代码块才会执行,并且可以调用其他suspend函数
}
}
}
}
liveData
使用LiveData时,我们可能需要异步计算值,例如:获取了用户信息后显示到界面,在这种情况下,我们可以使用liveData构建器函数调用suspend函数,并且将结果作为LiveData对象返回,示例代码如下所示:
val userInfoData: LiveData<UserInfoData> = liveData {
// getUserInfo函数是一个suspend函数
val data = remoteSource.getUserInfo()
emit(data)
}
liveData构建块用作协程和LiveData之间的结构化并发基元。
当LiveData变为活动状态的时候,代码块开始执行;当LiveData变为非活动状态的时候,代码块会在可配置的超时过后自动取消。如果代码块在完成前取消,则会在LiveData再次变成活动状态后重启;如果在上次运行中成功完成,则不会重启。要注意的是,代码块只有在自动取消的情况下才会重启,如果代码块由于任何其他原因(例如:抛出CancelationException)而取消,则不会重启。
我们可以从代码块中发出多个值,每次调用emit函数都会暂停执行代码块,直到在主线程上设置LiveData值。
我的GitHub:
https://github.com/TanJiaJunBeyond
Android通用框架:
https://github.com/TanJiaJunBeyond/AndroidGenericFramework
推荐阅读:
如果你想要跟大家分享你的文章,欢迎投稿~
┏(^0^)┛明天见!
以上是关于直面底层:“吹上天”的协程,带你深入源码分析的主要内容,如果未能解决你的问题,请参考以下文章