Kotlin Coroutines不复杂, 我来帮你理一理

Posted mengdd

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kotlin Coroutines不复杂, 我来帮你理一理相关的知识,希望对你有一定的参考价值。

Coroutines 协程

最近在总结Kotlin的一些东西, 发现协程这块确实不容易说清楚. 之前的那篇就写得不好, 所以决定重写.
反复研究了官网文档和各种教程博客, 本篇内容是最基础也最主要的内容, 力求小白也能看懂并理解.

Coroutines概念

Coroutines(协程), 计算机程序组件, 通过允许任务挂起和恢复执行, 来支持非抢占式的多任务. (见Wiki).

协程主要是为了异步, 非阻塞的代码. 这个概念并不是Kotlin特有的, Go, Python等多个语言中都有支持.

Kotlin Coroutines

Kotlin中用协程来做异步和非阻塞任务, 主要优点是代码可读性好, 不用回调函数. (用协程写的异步代码乍一看很像同步代码.)

Kotlin对协程的支持是在语言级别的, 在标准库中只提供了最低程度的APIs, 然后把很多功能都代理到库中.

Kotlin中只加了suspend作为关键字.
asyncawait不是Kotlin的关键字, 也不是标准库的一部分.

比起futures和promises, kotlin中suspending function的概念为异步操作提供了一种更安全和不易出错的抽象.

kotlinx.coroutines是协程的库, 为了使用它的核心功能, 项目需要增加kotlinx-coroutines-core的依赖.

Coroutines Basics: 协程到底是什么?

先上一段官方的demo:

import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch


fun main() {
    GlobalScope.launch { // launch a new coroutine in background and continue
        delay(1000L) // non-blocking delay for 1 second (default time unit is ms)
        println("World!") // print after delay
    }
    println("Hello,") // main thread continues while coroutine is delayed
    Thread.sleep(2000L) // block main thread for 2 seconds to keep JVM alive
}

这段代码的输出:
先打印Hello, 延迟1s之后, 打印World.

对这段代码的解释:

launch开始了一个计算, 这个计算是可挂起的(suspendable), 它在计算过程中, 释放了底层的线程, 当协程执行完成, 就会恢复(resume).

这种可挂起的计算就叫做一个协程(coroutine). 所以我们可以简单地说launch开始了一个新的协程.

注意, 主线程需要等待协程结束, 如果注释掉最后一行的Thread.sleep(2000L), 则只打印Hello, 没有World.

协程和线程的关系

coroutine(协程)可以理解为轻量级的线程. 多个协程可以并行运行, 互相等待, 互相通信. 协程和线程的最大区别就是协程非常轻量(cheap), 我们可以创建成千上万个协程而不必考虑性能.

协程是运行在线程上可以被挂起的运算. 可以被挂起, 意味着运算可以被暂停, 从线程移除, 存储在内存里. 此时, 线程就可以自由做其他事情. 当计算准备好继续进行时, 它会返回线程(但不一定要是同一个线程).

默认情况下, 协程运行在一个共享的线程池里, 线程还是存在的, 只是一个线程可以运行多个协程, 所以线程没必要太多.

调试

在上面的代码中加上线程的名字:

fun main() {
    GlobalScope.launch {
        // launch a new coroutine in background and continue
        delay(1000L) // non-blocking delay for 1 second (default time unit is ms)
        println("World! + ${Thread.currentThread().name}") // print after delay
    }
    println("Hello, + ${Thread.currentThread().name}") // main thread continues while coroutine is delayed
    Thread.sleep(2000L) // block main thread for 2 seconds to keep JVM alive
}

可以在IDE的Edit Configurations中设置VM options: -Dkotlinx.coroutines.debug, 运行程序, 会在log中打印出代码运行的协程信息:

Hello, + main
World! + DefaultDispatcher-worker-1 @coroutine#1

suspend function

上面例子中的delay方法是一个suspend function.
delay()Thread.sleep()的区别是: delay()方法可以在不阻塞线程的情况下延迟协程. (It doesn‘t block a thread, but only suspends the coroutine itself). 而Thread.sleep()则阻塞了当前线程.

所以, suspend的意思就是协程作用域被挂起了, 但是当前线程中协程作用域之外的代码不被阻塞.

如果把GlobalScope.launch替换为thread, delay方法下面会出现红线报错:

Suspend functions are only allowed to be called from a coroutine or another suspend function

suspend方法只能在协程或者另一个suspend方法中被调用.

在协程等待的过程中, 线程会返回线程池, 当协程等待结束, 协程会在线程池中一个空闲的线程上恢复. (The thread is returned to the pool while the coroutine is waiting, and when the waiting is done, the coroutine resumes on a free thread in the pool.)

启动协程

启动一个新的协程, 常用的主要有以下几种方式:

  • launch
  • async
  • runBlocking

它们被称为coroutine builders. 不同的库可以定义其他更多的构建方式.

runBlocking: 连接blocking和non-blocking的世界

runBlocking用来连接阻塞和非阻塞的世界.

runBlocking可以建立一个阻塞当前线程的协程. 所以它主要被用来在main函数中或者测试中使用, 作为连接函数.

比如前面的例子可以改写成:

fun main() = runBlocking<Unit> {
    // start main coroutine
    GlobalScope.launch {
        // launch a new coroutine in background and continue
        delay(1000L)
        println("World! + ${Thread.currentThread().name}")
    }
    println("Hello, + ${Thread.currentThread().name}") // main coroutine continues here immediately
    delay(2000L) // delaying for 2 seconds to keep JVM alive
}

最后不再使用Thread.sleep(), 使用delay()就可以了.
程序输出:

Hello, + main @coroutine#1
World! + DefaultDispatcher-worker-1 @coroutine#2

launch: 返回Job

上面的例子delay了一段时间来等待一个协程结束, 不是一个好的方法.

launch返回Job, 代表一个协程, 我们可以用Jobjoin()方法来显式地等待这个协程结束:

fun main() = runBlocking {
    val job = GlobalScope.launch {
        // launch a new coroutine and keep a reference to its Job
        delay(1000L)
        println("World! + ${Thread.currentThread().name}")
    }
    println("Hello, + ${Thread.currentThread().name}")
    job.join() // wait until child coroutine completes
}

输出结果和上面是一样的.

Job还有一个重要的用途是cancel(), 用于取消不再需要的协程任务.

async: 从协程返回值

async开启线程, 返回Deferred<T>, Deferred<T>Job的子类, 有一个await()函数, 可以返回协程的结果.

await()也是suspend函数, 只能在协程之内调用.

fun main() = runBlocking {
    // @coroutine#1
    println(Thread.currentThread().name)
    val deferred: Deferred<Int> = async {
        // @coroutine#2
        loadData()
    }
    println("waiting..." + Thread.currentThread().name)
    println(deferred.await()) // suspend @coroutine#1
}

suspend fun loadData(): Int {
    println("loading..." + Thread.currentThread().name)
    delay(1000L) // suspend @coroutine#2
    println("loaded!" + Thread.currentThread().name)
    return 42
}

运行结果:

main @coroutine#1
waiting...main @coroutine#1
loading...main @coroutine#2
loaded!main @coroutine#2
42

Context, Dispatcher和Scope

看一下launch方法的声明:

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
...
}

其中有几个相关概念我们要了解一下.

协程总是在一个context下运行, 类型是接口CoroutineContext. 协程的context是一个索引集合, 其中包含各种元素, 重要元素就有Job和dispatcher. Job代表了这个协程, 那么dispatcher是做什么的呢?

构建协程的coroutine builder: launch, async, 都是CoroutineScope类型的扩展方法. 查看CoroutineScope接口, 其中含有CoroutineContext的引用. scope是什么? 有什么作用呢?

下面我们就来回答这些问题.

Dispatchers和线程

Context中的CoroutineDispatcher可以指定协程运行在什么线程上. 可以是一个指定的线程, 线程池, 或者不限.

看一个例子:

fun main() = runBlocking<Unit> {
    launch {
        // context of the parent, main runBlocking coroutine
        println("main runBlocking      : I'm working in thread ${Thread.currentThread().name}")
    }
    launch(Dispatchers.Unconfined) {
        // not confined -- will work with main thread
        println("Unconfined            : I'm working in thread ${Thread.currentThread().name}")
    }
    launch(Dispatchers.Default) {
        // will get dispatched to DefaultDispatcher
        println("Default               : I'm working in thread ${Thread.currentThread().name}")
    }
    launch(newSingleThreadContext("MyOwnThread")) {
        // will get its own new thread
        println("newSingleThreadContext: I'm working in thread ${Thread.currentThread().name}")
    }
}

运行后打印出:

Unconfined            : I'm working in thread main
Default               : I'm working in thread DefaultDispatcher-worker-1
newSingleThreadContext: I'm working in thread MyOwnThread
main runBlocking      : I'm working in thread main

API提供了几种选项:

  • Dispatchers.Default代表使用JVM上的共享线程池, 其大小由CPU核数决定, 不过即便是单核也有两个线程. 通常用来做CPU密集型工作, 比如排序或复杂计算等.
  • Dispatchers.Main指定主线程, 用来做UI更新相关的事情. (需要添加依赖, 比如kotlinx-coroutines-android.) 如果我们在主线程上启动一个新的协程时, 主线程忙碌, 这个协程也会被挂起, 仅当线程有空时会被恢复执行.
  • Dispatchers.IO: 采用on-demand创建的线程池, 用于网络或者是读写文件的工作.
  • Dispatchers.Unconfined: 不指定特定线程, 这是一个特殊的dispatcher.

如果不明确指定dispatcher, 协程将会继承它被启动的那个scope的context(其中包含了dispatcher).

在实践中, 更推荐使用外部scope的dispatcher, 由调用方决定上下文. 这样也方便测试.

newSingleThreadContext创建了一个线程来跑协程, 一个专注的线程算是一种昂贵的资源, 在实际的应用中需要被释放或者存储复用.

切换线程还可以用withContext, 可以在指定的协程context下运行代码, 挂起直到它结束, 返回结果.
另一种方式是新启一个协程, 然后用join明确地挂起等待.

在Android这种UI应用中, 比较常见的做法是, 顶部协程用CoroutineDispatchers.Main, 当需要在别的线程上做一些事情的时候, 再明确指定一个不同的dispatcher.

Scope是什么?

launch, asyncrunBlocking开启新协程的时候, 它们自动创建相应的scope. 所有的这些方法都有一个带receiver的lambda参数, 默认的receiver类型是CoroutineScope.

IDE会提示this: CoroutineScope:

launch { /* this: CoroutineScope */
}

当我们在runBlocking, launch, 或async的大括号里面再创建一个新的协程的时候, 自动就在这个scope里创建:

fun main() = runBlocking {
    /* this: CoroutineScope */
    launch { /* ... */ }
    // the same as:
    this.launch { /* ... */ }
}

因为launch是一个扩展方法, 所以上面例子中默认的receiver是this.
这个例子中launch所启动的协程被称作外部协程(runBlocking启动的协程)的child. 这种"parent-child"的关系通过scope传递: child在parent的scope中启动.

协程的父子关系:

  • 当一个协程在另一个协程的scope中被启动时, 自动继承其context, 并且新协程的Job会作为父协程Job的child.

所以, 关于scope目前有两个关键知识点:

  • 我们开启一个协程的时候, 总是在一个CoroutineScope里.
  • Scope用来管理不同协程之间的父子关系和结构.

协程的父子关系有以下两个特性:

  • 父协程被取消时, 所有的子协程都被取消.
  • 父协程永远会等待所有的子协程结束.

值得注意的是, 也可以不启动协程就创建一个新的scope. 创建scope可以用工厂方法: MainScope()CoroutineScope().

coroutineScope()方法也可以创建scope. 当我们需要以结构化的方式在suspend函数内部启动新的协程, 我们创建的新的scope, 自动成为suspend函数被调用的外部scope的child.

上面的父子关系, 可以进一步抽象到, 没有parent协程, 由scope来管理其中所有的子协程.

Scope在实际应用中解决什么问题呢? 如果我们的应用中, 有一个对象是有自己的生命周期的, 但是这个对象又不是协程, 比如Android应用中的Activity, 其中启动了一些协程来做异步操作, 更新数据等, 当Activity被销毁的时候需要取消所有的协程, 来避免内存泄漏. 我们就可以利用CoroutineScope来做这件事: 创建一个CoroutineScope对象和activity的生命周期绑定, 或者让activity实现CoroutineScope接口.

所以, scope的主要作用就是记录所有的协程, 并且可以取消它们.

A CoroutineScope keeps track of all your coroutines, and it can cancel all of the coroutines started in it.

Structured Concurrency

这种利用scope将协程结构化组织起来的机制, 被称为"structured concurrency".
好处是:

  • scope自动负责子协程, 子协程的生命和scope绑定.
  • scope可以自动取消所有的子协程.
  • scope自动等待所有的子协程结束. 如果scope和一个parent协程绑定, 父协程会等待这个scope中所有的子协程完成.

通过这种结构化的并发模式: 我们可以在创建top级别的协程时, 指定主要的context一次, 所有嵌套的协程会自动继承这个context, 只在有需要的时候进行修改即可.

GlobalScope: daemon

GlobalScope启动的协程都是独立的, 它们的生命只受到application的限制. 即GlobalScope启动的协程没有parent, 和它被启动时所在的外部的scope没有关系.

launch(Dispatchers.Default) { ... }GlobalScope.launch { ... }用的dispatcher是一样的.

GlobalScope启动的协程并不会保持进程活跃. 它们就像daemon threads(守护线程)一样, 如果JVM发现没有其他一般的线程, 就会关闭.

参考

第三方博客:

以上是关于Kotlin Coroutines不复杂, 我来帮你理一理的主要内容,如果未能解决你的问题,请参考以下文章

androidx.paging.DataSource.Factory 时 Room 不检索数据(使用 LiveData + Kotlin Coroutines)

Spring WebFlux Reactive 和 Kotlin Coroutines 启动错误

Kotlin Coroutines 协程实现原理全解析

Kotlin Coroutines 中的 main-safe 是啥?

Kotlin协程基础介绍--协程(Coroutines)是一种轻量级的线程

Kotlin Coroutines 的现有 3 函数回调