Kotlin协程基础-CoroutineContext

Posted 且听真言

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kotlin协程基础-CoroutineContext相关的知识,希望对你有一定的参考价值。

一、协程上下文CoroutineContext

CoroutineContext是协程的上下文,它的使用场景很多。

1.CoroutineScope的launch扩展函数的第一个参数就是CoroutineContext,默认值是EmptyCoroutineContext。

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job 
    ......

2.withContext() 函数的参数中也有CoroutineContext。

public suspend fun <T> withContext(
    context: CoroutineContext,
    block: suspend CoroutineScope.() -> T
): T 
    ......
suspend fun getUserInfo(): String 
    printCoroutine("Before IO Context")
    withContext(Dispatchers.IO) 
        printCoroutine("In IO Context")
        delay(1000L)
    
    printCoroutine("After IO Context")
    return "David"


 runBlocking 
        val userInfo = getUserInfo()
        printCoroutine(userInfo)
    

fun printCoroutine(any: Any?) 
    println("" + any + ";Thread:" + Thread.currentThread().name)



Log:

Before IO Context;Thread:main @coroutine#1
In IO Context;Thread:DefaultDispatcher-worker-1 @coroutine#1
After IO Context;Thread:main @coroutine#1
David;Thread:main @coroutine#1

withContext() 指定Dispatchers.IO以后,Lambda 当中的代码就会被分发到 DefaultDispatcher 线程池中去执行,而它外部的所有代码仍然还是运行在 main 线程上。

在runBlocking中传入Dispatchers.IO,则所有的代码都运行在 DefaultDispatcher 这个线程池当中了。

  runBlocking(Dispatchers.IO) 
        val userInfo = getUserInfo()
        printCoroutine(userInfo)
    


suspend fun getUserInfo(): String 
    printCoroutine("Before IO Context")
    withContext(Dispatchers.IO) 
        printCoroutine("In IO Context")
        delay(1000L)
    
    printCoroutine("After IO Context")
    return "David"



Log:

Before IO Context;Thread:DefaultDispatcher-worker-1 @coroutine#1
In IO Context;Thread:DefaultDispatcher-worker-1 @coroutine#1
After IO Context;Thread:DefaultDispatcher-worker-1 @coroutine#1
David;Thread:DefaultDispatcher-worker-1 @coroutine#1

3.内置 Dispatcher

Dispatchers.Main 在 UI 编程平台才有意义,比如android的Main线程。

Dispatchers.Unconfined 可能运行在任意线程之上。

Dispatchers.Default 用于CPU 密集型任务的线程池。一般来说,它内部的线程个数是与机器 CPU 核心数量保持一致的,不过它有一个最小限制 2。

Dispatchers.IO  用于 IO 密集型任务的线程池。具体线程的数量可以通过参数来配置:kotlinx.coroutines.io.parallelism。

注意:Dispatchers.IO 底层是可能复用 Dispatchers.Default 当中的线程的。

suspend fun getUserInfo(): String 
    printCoroutine("Before IO Context")
    withContext(Dispatchers.IO) 
        printCoroutine("In IO Context")
        delay(1000L)
    
    printCoroutine("After IO Context")
    return "David"


  runBlocking(Dispatchers.Default) 
        val userInfo = getUserInfo()
        printCoroutine(userInfo)
    

Log:
Before IO Context;Thread:DefaultDispatcher-worker-1 @coroutine#1
In IO Context;Thread:DefaultDispatcher-worker-2 @coroutine#1
After IO Context;Thread:DefaultDispatcher-worker-2 @coroutine#1
David;Thread:DefaultDispatcher-worker-2 @coroutine#1

Dispatchers.Default 线程池当中有富余线程的时候,它是可以被 IO 线程池复用的。Dispatchers.Default 被 Dispatchers.IO 复用线程导致的。

4.使用自定义Dispatcher

 runBlocking(mySingleDispatcher) 
        val userInfo = getUserInfo()
        printCoroutine(userInfo)
    


val mySingleDispatcher = Executors.newSingleThreadExecutor 
    Thread(it, "MySingleThread").apply 
        isDaemon = true
    
.asCoroutineDispatcher()

suspend fun getUserInfo(): String 
    printCoroutine("Before IO Context")
    withContext(Dispatchers.IO) 
        printCoroutine("In IO Context")
        delay(1000L)
    
    printCoroutine("After IO Context")
    return "David"




Log:

Before IO Context;Thread:MySingleThread @coroutine#1
In IO Context;Thread:DefaultDispatcher-worker-1 @coroutine#1
After IO Context;Thread:MySingleThread @coroutine#1
David;Thread:MySingleThread @coroutine#1

 通过 asCoroutineDispatcher() 扩展函数,创建了一个 Dispatcher。Dispatcher 的本质仍然还是线程,协程运行在线程之上。当为 runBlocking 传入自定义的 mySingleDispatcher 以后,由于它底层只有一个线程,因此只有“In IO Context”是运行在 DefaultDispatcher 这个线程池的,其他代码都运行在 mySingleDispatcher 之上。

二、CoroutineContext 

1.CoroutineScope

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job 
    ......


public interface CoroutineScope 
    
    public val coroutineContext: CoroutineContext

 CoroutineScope是一个接口,而这个接口只有唯一的成员,就是 CoroutineContext。所以,CoroutineScope 只是对 CoroutineContext 做了一层封装而已,它的核心能力其实都来自于 CoroutineContext。

CoroutineScope 的作用:可以方便我们批量控制协程。

  runBlocking 
        val scope = CoroutineScope(Job())
        scope.launch 
            printCoroutine("First start!")
            delay(1000L)
            printCoroutine("First End!")
        

        scope.launch 
            printCoroutine("Second start!")
            delay(1000L)
            printCoroutine("Second End!")
        

        scope.launch 
            printCoroutine("Third start!")
            delay(1000L)
            printCoroutine("Third End!")
        

        delay(500L)
        scope.cancel()
        delay(1000L)
    

Log:

First start!;Thread:DefaultDispatcher-worker-1 @coroutine#2
Second start!;Thread:DefaultDispatcher-worker-2 @coroutine#3
Third start!;Thread:DefaultDispatcher-worker-3 @coroutine#4

2.Job 和 Dispatcher

 Job 继承自 CoroutineContext.Element,而 CoroutineContext.Element 继承自 CoroutineContext,Job 是间接继承自 CoroutineContext 的。所以说,Job 就是一个 CoroutineContext。

public interface Job : CoroutineContext.Element 


public interface CoroutineContext 
        public interface Element : CoroutineContext 
            

 CoroutineContext 本身的接口设计:

@SinceKotlin("1.3")
public interface CoroutineContext 
   
    public operator fun <E : Element> get(key: Key<E>): E?

    public fun <R> fold(initial: R, operation: (R, Element) -> R): R

    
    public operator fun plus(context: CoroutineContext): CoroutineContext =
        if (context === EmptyCoroutineContext) this else // fast path -- avoid lambda creation
            context.fold(this)  acc, element ->
                val removed = acc.minusKey(element.key)
                if (removed === EmptyCoroutineContext) element else 
                    // make sure interceptor is always last in the context (and thus is fast to get when present)
                    val interceptor = removed[ContinuationInterceptor]
                    if (interceptor == null) CombinedContext(removed, element) else 
                        val left = removed.minusKey(ContinuationInterceptor)
                        if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) else
                            CombinedContext(CombinedContext(left, element), interceptor)
                    
                
            

    
    public fun minusKey(key: Key<*>): CoroutineContext

   
    public interface Key<E : Element>

   
    public interface Element : CoroutineContext 
        
		
        public val key: Key<*>

        public override operator fun <E : Element> get(key: Key<E>): E? =
            @Suppress("UNCHECKED_CAST")
            if (this.key == key) this as E else null

        public override fun <R> fold(initial: R, operation: (R, Element) -> R): R =
            operation(initial, this)

        public override fun minusKey(key: Key<*>): CoroutineContext =
            if (this.key == key) EmptyCoroutineContext else this
    

 从get()、plus()、minusKey()、fold() 看CoroutineContext 的接口设计,和 Map 十分类似。可以把 CoroutineContext 当作 Map 来用。

 runBlocking 
        val scope = CoroutineScope(Job() + mySingleDispatcher)
        scope.launch 
            printCoroutine(coroutineContext[CoroutineDispatcher] == mySingleDispatcher)
            delay(1000L)
            printCoroutine("First end!")
        

        delay(500L)
        scope.cancel()
        delay(1000L)
    

Log:

true;Thread:MySingleThread @coroutine#2

 使用了“Job() + mySingleDispatcher”这样的方式创建 CoroutineScope, CoroutineContext 的 plus() 进行了操作符重载。


public operator fun <E : Element> plus(key: Key<E>): E?

public operator fun <E : Element> plus(key: Key<E>): E?

创建出 scope 以后,后续创建的协程就全部都运行在 mySingleDispatcher 这个线程之上了。


public actual object Dispatchers 

    public actual val Default: CoroutineDispatcher = DefaultScheduler

    public actual val Main: MainCoroutineDispatcher get() = MainDispatcherLoader.dispatcher

    public actual val Unconfined: CoroutineDispatcher = kotlinx.coroutines.Unconfined

    public val IO: CoroutineDispatcher = Defaultioscheduler

    public fun shutdown()     


public abstract class CoroutineDispatcher :
    AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor 

public interface ContinuationInterceptor : CoroutineContext.Element 

Dispatchers 其实是一个 object 单例,它的内部成员的类型是 CoroutineDispatcher,而它又是继承自 ContinuationInterceptor,这个类则是实现了 CoroutineContext.Element 接口。由此可见,Dispatcher 确实就是 CoroutineContext。

3.CoroutineName

 runBlocking 
        val scope = CoroutineScope(Job() + mySingleDispatcher)
        scope.launch(CoroutineName("MyFirstCoroutine!")) 
            printCoroutine(coroutineContext[CoroutineDispatcher] == mySingleDispatcher)
            delay(1000L)
            printCoroutine("First end!")
        

        delay(500L)
        scope.cancel()
        delay(1000L)
    

true;Thread:MySingleThread @MyFirstCoroutine!#2

调用 launch 的时候,传入了“CoroutineName(“MyFirstCoroutine!”)”作为协程的名字,得到了“@MyFirstCoroutine!#2”这样的输出。

4.CoroutineExceptionHandler

CoroutineExceptionHandler 的 handleException() 可以自定义异常处理器。



public interface CoroutineExceptionHandler : CoroutineContext.Element 

    public companion object Key : CoroutineContext.Key<CoroutineExceptionHandler>

    public fun handleException(context: CoroutineContext, exception: Throwable)

    runBlocking 
        val myExceptionHandler = CoroutineExceptionHandler_,throwable->
            println("Catch exception: $throwable")
        
        val scope = CoroutineScope(Job()+ mySingleDispatcher)
        val job = scope.launch(myExceptionHandler) 
            val s: String? = null
            s!!.length
        
        job.join()
    

Catch exception: java.lang.NullPointerException

以上是关于Kotlin协程基础-CoroutineContext的主要内容,如果未能解决你的问题,请参考以下文章

Kotlin 协程协程底层实现 ① ( Kotlin 协程分层架构 | 基础设施层 | 业务框架层 | 使用 Kotlin 协程基础设施层标准库 Api 实现协程 )

Kotlin协程基础

Kotlin协程-并发处理-基础

Kotlin 协程基础Coroutine

Kotlin 协程基础Coroutine

Kotlin 协程基础Coroutine