对比Java学Kotlin协程-创建和取消
Posted 陈蒙_
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了对比Java学Kotlin协程-创建和取消相关的知识,希望对你有一定的参考价值。
文章目录
v1.6 Kotlin 协程全景图:
一、创建协程
截至 2022.05 月,Java 尚未在正式版中支持协程,而是通过织布机计划实验性的支持了虚拟线程,作用类似于协程。
在 Java 里面,我们可以通过 Thread 类来创建一个线程:
在 Kotlin 中,我们怎么创建一个协程呢?
有如下方法可以创建协程:runBlocking()、launch()、async()、produce()、actor(),我们一一介绍。
launch
我们先来创建一个最简单的协程:
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch
fun main()
val job : Job = GlobalScope.launch
delay(1000)
println("World!")
print("Hello, ")
Thread.sleep(1500)
我们逐行解读代码。
首先是 GlobalScope。GlobalScope 是协程作用域(CoroutineScope)的一种。每个协程必须归属于某个 CoroutineScope,称为“结构性并发”(Structured Concurrency),不仅 Kotlin,Swift 也通过 Task 支持了结构性并发。
有了 CoroutineScope,我们可以很方便的对作用域下注册的所有协程进行统一管理,比如将其全部取消执行等。除了 GlobalScope,还有为安卓设计的 MainScope 以及用 CoroutineScope.coroutineScope() 构造的自定义作用域。借助 MainScope,我们可以在 Activity 销毁时取消执行协程,从而避免内存泄漏。
launch 是 CoroutineScope 的扩展函数,其完整的声明是:
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job
launch() 是最常见的创建协程的方法,返回一个 Job,可以通过 Job.cancel() 来终止该 Job 下所有的协程,其效果跟 CoroutineScope 是一样的,其实 CoroutineScope.cancel() 也是委托给 Job.cancel() 来实现的。
通过 launch() 我们就启动了一个协程,虽然里面只有两行代码:
val job : Job = GlobalScope.launch
delay(1000)
println("World!")
为了方便组织代码,我们可以将上述两行代码封装成一个方法。注意,如果该方法是耗时方法,则需要在前面加上 suspend 关键字:
fun main()
val job : Job = GlobalScope.launch
printDelayedWorld()
print("Hello, ")
Thread.sleep(1500)
suspend fun printDelayedWorld()
delay(1000)
println("World!")
runBlocking
最后需要注意的是 Thread.sleep(1500)
这行代码:
fun main()
val job : Job = GlobalScope.launch
delay(1000)
println("World!")
print("Hello, ")
Thread.sleep(1500)
这行代码是必现要有的,如果没有这行代码,上面的程序只能打印出 “Hello, ”,而无法打印出“World!”,因为在执行完print("Hello, ")
之后,整个进程就结束了,里面的所有线程也就被强制结束了,运行在线程之上的协程也就无法执行了。我们可以将上述代码进行如下改写,也能达到相同的目的:
fun main() = runBlocking
val job : Job = GlobalScope.launch
delay(1000)
println("World!")
print("Hello, ")
async & await
除了 launch(),我们还有其他协程构造器,比如 async()、produce()、actor(),其中 async() 方法不会立即启动一个协程,而是在调用 await() 方法时才会启动,需要注意的是 await() 是同步的,比如下面代码中 println("Completed in $time ms")
必须等待2个 await() 执行完毕后再执行:
import kotlinx.coroutines.*
import kotlin.system.measureTimeMillis
fun main() = runBlocking(CoroutineName("main"))
val time = measureTimeMillis
val one = async(CoroutineName("v1")) one()
val two = async(CoroutineName("v2")) two()
log("launched two async")
log("the answer is $one.await() + two.await()")
println("Completed in $time ms")
suspend fun one(): Int
delay(1000)
log("Computing v1")
return 19
suspend fun two(): Int
delay(500)
log("Computing v2")
return 10
fun log(msg: String) = println("[$Thread.currentThread().name] $msg")
运行结果:
[main]launched two async
[main]Computing v1
[main]Computing v1
[main]the answer is 29
Completed in 1020 ms
async()
构造器返回值类型是 Deferred
,Deferred
是 Job
的子类。
然后是 delay() 方法。这是一个非阻塞的方法。非阻塞的意思是不会让线程由 Run 状态转入 Block 状态,该线程可以继续执行其他代码。相比之下,Thread.sleep() 则是阻塞方法,会使线程处于阻塞状态,无法继续执行。
需要注意的是,GlobalScope 的生命周期与应用相同,一般不建议使用,因为很容易因遗忘cancel()操作而导致内存泄漏。可以使用自定义的作用域:
fun main() = runBlocking
coroutineScope
launch
launch
看到 async() 和 await(),让人不免联想到其他语言中的 async 和 await 关键字,比如 javascript、Dart、Swift 和 C#。我们简单比较下二者的异同:
- Kotlin 的 async & await 是以扩展方法的形式存在的,其他语言是以关键字的形式存在;
- Kotlin 的 await() 返回的是一个 Deferred 类型,这与 Js 的 Promise、Dart 的 Future 和 Swift 的 Task 是类似的,执行到 await 都会挂起。
二、取消协程
取消正在运行中的协程的方法有:cancel,withTimeout,抛异常。
cancel()
我们可以调用 CoroutineScope.cancel() 来取消该作用域下的所有协程,包括子孙协程。我们在安卓开发中常见的 mainScope 就是采用的这种方式:
class MyandroidActivity
private val scope = MainScope()
override fun onDestroy()
super.onDestroy()
scope.cancel()
实际上 CoroutineScope.cancel() 内部调用的是 Job 的 cancel() 实现的:
/**
* Cancels this scope, including its job and all its children with an optional cancellation [cause].
* A cause can be used to specify an error message or to provide other details on
* a cancellation reason for debugging purposes.
* Throws [IllegalStateException] if the scope does not have a job in it.
*/
public fun CoroutineScope.cancel(cause: CancellationException? = null)
val job = coroutineContext[Job] ?: error("Scope cannot be cancelled because it does not have a job: $this")
job.cancel(cause)
如注释所述,如果该作用域没有 Job,则抛出 IllegalStateException 异常。
cancel() 方法执行之后 Job 内部发生了什么呢?具体可以看如下状态机:
cancel() 方法只对suspend()方法有效。因为 suspend() 方法执行时会先检查该作用域或Job是否被取消,如果被取消就不再往下执行了。而非 suspend() 方法因为没有这种功能,所以是无法生效的。比如:
import kotlinx.coroutines.*
fun main() = runBlocking
val job = launch(Dispatchers.Default)
try
println("coroutine started")
delay(1000)
println("coroutine completed")
catch (e: Exception)
e.printStackTrace()
finally
println("coroutine finally")
delay(500)
println("coroutine canceled")
job.cancel(CancellationException("i am canceled"))
job.join()
println("coroutine ended")
join() 方法的作用是等待所有协程执行完成。
上述代码的执行结果如下:
coroutine started
coroutine canceled
coroutine finally
coroutine ended
java.util.concurrent.CancellationException: i am canceled
at coroutine.basics.CancelKt$main$1.invokeSuspend(cancel.kt:24)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
at kotlinx.coroutines.DispatchedTaskKt.resume(DispatchedTask.kt:234)
at kotlinx.coroutines.DispatchedTaskKt.dispatch(DispatchedTask.kt:166)
at kotlinx.coroutines.CancellableContinuationImpl.dispatchResume(CancellableContinuationImpl.kt:397)
at kotlinx.coroutines.CancellableContinuationImpl.resumeImpl(CancellableContinuationImpl.kt:431)
at kotlinx.coroutines.CancellableContinuationImpl.resumeImpl$default(CancellableContinuationImpl.kt:420)
at kotlinx.coroutines.CancellableContinuationImpl.resumeUndispatched(CancellableContinuationImpl.kt:518)
at kotlinx.coroutines.EventLoopImplBase$DelayedResumeTask.run(EventLoop.common.kt:489)
at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:274)
at kotlinx.coroutines.BlockingCoroutine.joinBlocking(Builders.kt:85)
at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking(Builders.kt:59)
at kotlinx.coroutines.BuildersKt.runBlocking(Unknown Source)
at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking$default(Builders.kt:38)
at kotlinx.coroutines.BuildersKt.runBlocking$default(Unknown Source)
at coroutine.basics.CancelKt.main(cancel.kt:5)
at coroutine.basics.CancelKt.main(cancel.kt)
协程被取消成功,被取消的时候会抛出 CancellationException。之所以能被取消,是因为协程代码块里面有 delay() 这个 suspend 方法,如果我们把 delay() 换成非 suspend 方法 Thread.sleep(),能被取消成功吗?我们试验下:
fun main() = runBlocking
val job = launch(Dispatchers.Default)
try
println("coroutine started")
Thread.sleep(1000) // 注意此处不再是 suspend 方法 delay()
println("coroutine completed")
catch (e: Exception)
e.printStackTrace()
finally
println("coroutine finally")
delay(500)
println("coroutine canceled")
job.cancel(CancellationException("i am canceled"))
job.join()
println("coroutine ended")
输出结果如下:
coroutine started
coroutine canceled
coroutine completed
coroutine finally
coroutine ended
可以看出,协程没有被取消。如果我们想不包含suspend方法的协程被取消,可以手动检查下当前协程的状态,如果 isActive 是 true 才往下执行,否则就结束掉:
fun main() = runBlocking
val job = launch(Dispatchers.Default)
try
println("coroutine started")
Thread.sleep(1000)
if (isActive)
println("coroutine completed")
catch (e: Exception)
e.printStackTrace()
finally
println("coroutine finally")
delay(500)
println("coroutine canceled")
job.cancel(CancellationException("i am canceled"))
job.join()
println("coroutine ended")
输出如下:
coroutine started
coroutine canceled
coroutine finally
coroutine ended
coroutine started
coroutine canceled
coroutine completed
coroutine finally
coroutine ended
需要注意的是,如果协程已经被取消了,在 finally 代码里面使用 suspend 方法是不会执行的:
import kotlinx.coroutines.*
fun main() = runBlocking
val job = launch(Dispatchers.Default)
try
println("coroutine started")
delay(1000)
catch (e: Exception)
e.printStackTrace()
finally
println("running finally")
delay(1000)
println("coroutine finally")
delay(500)
println("coroutine canceled")
job.cancelAndJoin()
println("coroutine ended")
输出如下:
coroutine started
coroutine canceled
running finally
coroutine ended
kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job=StandaloneCoroutineCancelling@7ebf7c43
可以看出对应的这两行代码都没有执行:
delay(1000)
println("coroutine finally")
如果我们想让上述两行代码执行,可以使用 withContext(NonCancellable)
:
package coroutine.basics
import kotlinx.coroutines.*
fun main() = runBlocking
val job = launch(Dispatchers.Default)
try
println("coroutine started")
delay(1000)
catch (e: Exception)
e.printStackTrace()
finally
withContext(NonCancellable)
println("running finally")
delay(1000)
println("coroutine finally")
delay(500)
println("coroutine canceled")
job.cancelAndJoin()
println("coroutine ended")
上述代码输出如下:
coroutine started
coroutine canceled
running finally
kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job=StandaloneCoroutineCancelling@70c1cc32
coroutine finally
coroutine ended
withTimeout
除了 cancel() 方法,我们还可以使用 withTimeout() 来取消协程:
fun main() = runBlocking
withTimeout(1300L)
repeat(1000) i ->
println("i am sleeping $i")
delay(500)
在过了 1300ms 后就会取消正在运行的协程,上述代码输出如下:
i am sleeping 0
i am sleeping 1
i am sleeping 2
Exception in thread "main" kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 1300 ms
at kotlinx.coroutines.TimeoutKt.TimeoutCancellationException(Timeout.kt:186)
at kotlinx.coroutines.TimeoutCoroutine.run(Timeout.kt:156)
at kotlinx.coroutines.EventLoopImplBase$DelayedRunnableTask.run(EventLoop.common.kt:497)
at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:274)
at kotlinx.coroutines.DefaultExecutor.run(DefaultExecutor.kt:69)
at java.base/java.lang.Thread.run(Thread.java:831)
withTimeout() 通过抛出 TimeoutCancellationException 的方式来结束协程。TimeoutCancellationException 是 CancellationException 的子类。如果我们想在结束时做些额外的操作比如关闭资源等,可以使用 try-catch(e: TimeoutCancellationException) 。或者使用 withTimeoutOrNull
,该方法返回null而非抛出异常:
fun main() = runBlocking
val ret = withTimeoutOrNull(1300L)
repeat(1000) i ->
println("i am sleeping $i")
delay(500)
println("Result is $ret")
上述代码输出为:
i am sleeping 0
i am sleeping 1
i am sleeping 2
Result is null
需要注意的是,withTimeout
方法是异步的,是和其包裹着的代码块并发执行的,执行的时机可能是任何时候,包括代码块正在或已经执行完成 return 语句等。正因为此,我们在代码块里面打开或者获取需要关闭的资源时,一定要格外小心。比如我们有一个 Resource 类,来监控被打开和关闭的次数,我们用10w个协程模拟对该资源的打开和关闭操作:
var acquired = 0
class Resource
init
acquired++
fun close()
acquired--
fun main()
runBlocking
repeat(100_000)
launch
val resource = withTimeout(20)
delay(19)
Resource()
resource.close()
println(acquired)
根据执行环境不同,可以适当调整 20 和 19的数值,我们会发现多执行几次后打印的结果并不总是0。注意,上述代码并不会出现线程安全的问题,因为10w的协程都在是主线程中执行的。那么我们怎么避免这个问题呢?可以存储一个资源的引用,然后使用这个引用释放资源:
fun main()
runBlocking
repeat(1000_000)
launch
var resource: Resource? = null
try
withTimeout(20)
delay(19)
resource = Resource()
finally
resource?.close()
println(acquired)
这样每次输出都是0了,不会出现资源泄露的情况。
参考文章
以上是关于对比Java学Kotlin协程-创建和取消的主要内容,如果未能解决你的问题,请参考以下文章