对比Java学Kotlin协程-创建和取消

Posted 陈蒙_

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了对比Java学Kotlin协程-创建和取消相关的知识,希望对你有一定的参考价值。

文章目录


v1.6 Kotlin 协程全景图:

一、创建协程

截至 2022.05 月,Java 尚未在正式版中支持协程,而是通过织布机计划实验性的支持了虚拟线程,作用类似于协程。
在 Java 里面,我们可以通过 Thread 类来创建一个线程:


在 Kotlin 中,我们怎么创建一个协程呢?
有如下方法可以创建协程:runBlocking()、launch()、async()、produce()、actor(),我们一一介绍。

launch

我们先来创建一个最简单的协程:

import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.delay
import kotlinx.coroutines.launch

fun main() 
    val job : Job = GlobalScope.launch 
        delay(1000)
        println("World!")
    
    print("Hello, ")
    Thread.sleep(1500)

我们逐行解读代码。

首先是 GlobalScope。GlobalScope 是协程作用域(CoroutineScope)的一种。每个协程必须归属于某个 CoroutineScope,称为“结构性并发”(Structured Concurrency),不仅 Kotlin,Swift 也通过 Task 支持了结构性并发。

有了 CoroutineScope,我们可以很方便的对作用域下注册的所有协程进行统一管理,比如将其全部取消执行等。除了 GlobalScope,还有为安卓设计的 MainScope 以及用 CoroutineScope.coroutineScope() 构造的自定义作用域。借助 MainScope,我们可以在 Activity 销毁时取消执行协程,从而避免内存泄漏。

launch 是 CoroutineScope 的扩展函数,其完整的声明是:

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job 

launch() 是最常见的创建协程的方法,返回一个 Job,可以通过 Job.cancel() 来终止该 Job 下所有的协程,其效果跟 CoroutineScope 是一样的,其实 CoroutineScope.cancel() 也是委托给 Job.cancel() 来实现的。
通过 launch() 我们就启动了一个协程,虽然里面只有两行代码:

    val job : Job = GlobalScope.launch 
        delay(1000)
        println("World!")
    

为了方便组织代码,我们可以将上述两行代码封装成一个方法。注意,如果该方法是耗时方法,则需要在前面加上 suspend 关键字:

fun main() 
    val job : Job = GlobalScope.launch 
        printDelayedWorld()
    
    print("Hello, ")
    Thread.sleep(1500)


suspend fun printDelayedWorld() 
    delay(1000)
    println("World!")

runBlocking

最后需要注意的是 Thread.sleep(1500) 这行代码:

fun main() 
    val job : Job = GlobalScope.launch 
        delay(1000)
        println("World!")
    
    print("Hello, ")
    Thread.sleep(1500)

这行代码是必现要有的,如果没有这行代码,上面的程序只能打印出 “Hello, ”,而无法打印出“World!”,因为在执行完print("Hello, ")之后,整个进程就结束了,里面的所有线程也就被强制结束了,运行在线程之上的协程也就无法执行了。我们可以将上述代码进行如下改写,也能达到相同的目的:

fun main() = runBlocking 
    val job : Job = GlobalScope.launch 
        delay(1000)
        println("World!")
    
    print("Hello, ")

async & await

除了 launch(),我们还有其他协程构造器,比如 async()、produce()、actor(),其中 async() 方法不会立即启动一个协程,而是在调用 await() 方法时才会启动,需要注意的是 await() 是同步的,比如下面代码中 println("Completed in $time ms") 必须等待2个 await() 执行完毕后再执行:

import kotlinx.coroutines.*
import kotlin.system.measureTimeMillis

fun main() = runBlocking(CoroutineName("main")) 
    val time = measureTimeMillis 
        val one = async(CoroutineName("v1"))  one() 
        val two = async(CoroutineName("v2"))  two() 
        log("launched two async")
        log("the answer is $one.await() + two.await()")
    

    println("Completed in $time ms")


suspend fun one(): Int 
    delay(1000)
    log("Computing v1")
    return 19


suspend fun two(): Int 
    delay(500)
    log("Computing v2")
    return 10


fun log(msg: String) = println("[$Thread.currentThread().name] $msg")

运行结果:

[main]launched two async
[main]Computing v1
[main]Computing v1
[main]the answer is 29
Completed in 1020 ms

async() 构造器返回值类型是 DeferredDeferredJob 的子类。

然后是 delay() 方法。这是一个非阻塞的方法。非阻塞的意思是不会让线程由 Run 状态转入 Block 状态,该线程可以继续执行其他代码。相比之下,Thread.sleep() 则是阻塞方法,会使线程处于阻塞状态,无法继续执行。

需要注意的是,GlobalScope 的生命周期与应用相同,一般不建议使用,因为很容易因遗忘cancel()操作而导致内存泄漏。可以使用自定义的作用域:

fun main() = runBlocking 
    coroutineScope 
        launch 
        launch 
    

看到 async() 和 await(),让人不免联想到其他语言中的 async 和 await 关键字,比如 javascript、Dart、Swift 和 C#。我们简单比较下二者的异同:

  • Kotlin 的 async & await 是以扩展方法的形式存在的,其他语言是以关键字的形式存在;
  • Kotlin 的 await() 返回的是一个 Deferred 类型,这与 Js 的 Promise、Dart 的 Future 和 Swift 的 Task 是类似的,执行到 await 都会挂起。

二、取消协程

取消正在运行中的协程的方法有:cancel,withTimeout,抛异常。

cancel()

我们可以调用 CoroutineScope.cancel() 来取消该作用域下的所有协程,包括子孙协程。我们在安卓开发中常见的 mainScope 就是采用的这种方式:

  class MyandroidActivity 
      private val scope = MainScope()
 
      override fun onDestroy() 
          super.onDestroy()
          scope.cancel()
      
  

实际上 CoroutineScope.cancel() 内部调用的是 Job 的 cancel() 实现的:

/**
 * Cancels this scope, including its job and all its children with an optional cancellation [cause].
 * A cause can be used to specify an error message or to provide other details on
 * a cancellation reason for debugging purposes.
 * Throws [IllegalStateException] if the scope does not have a job in it.
 */
public fun CoroutineScope.cancel(cause: CancellationException? = null) 
    val job = coroutineContext[Job] ?: error("Scope cannot be cancelled because it does not have a job: $this")
    job.cancel(cause)

如注释所述,如果该作用域没有 Job,则抛出 IllegalStateException 异常。

cancel() 方法执行之后 Job 内部发生了什么呢?具体可以看如下状态机:

cancel() 方法只对suspend()方法有效。因为 suspend() 方法执行时会先检查该作用域或Job是否被取消,如果被取消就不再往下执行了。而非 suspend() 方法因为没有这种功能,所以是无法生效的。比如:

import kotlinx.coroutines.*

fun main() = runBlocking 
    val job = launch(Dispatchers.Default) 
        try 
            println("coroutine started")
            delay(1000)
            println("coroutine completed")
         catch (e: Exception) 
            e.printStackTrace()
         finally 
            println("coroutine finally")
        
    

    delay(500)

    println("coroutine canceled")
    job.cancel(CancellationException("i am canceled"))
    job.join()

    println("coroutine ended")

join() 方法的作用是等待所有协程执行完成。
上述代码的执行结果如下:

coroutine started
coroutine canceled
coroutine finally
coroutine ended
java.util.concurrent.CancellationException: i am canceled
	at coroutine.basics.CancelKt$main$1.invokeSuspend(cancel.kt:24)
	at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
	at kotlinx.coroutines.DispatchedTaskKt.resume(DispatchedTask.kt:234)
	at kotlinx.coroutines.DispatchedTaskKt.dispatch(DispatchedTask.kt:166)
	at kotlinx.coroutines.CancellableContinuationImpl.dispatchResume(CancellableContinuationImpl.kt:397)
	at kotlinx.coroutines.CancellableContinuationImpl.resumeImpl(CancellableContinuationImpl.kt:431)
	at kotlinx.coroutines.CancellableContinuationImpl.resumeImpl$default(CancellableContinuationImpl.kt:420)
	at kotlinx.coroutines.CancellableContinuationImpl.resumeUndispatched(CancellableContinuationImpl.kt:518)
	at kotlinx.coroutines.EventLoopImplBase$DelayedResumeTask.run(EventLoop.common.kt:489)
	at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:274)
	at kotlinx.coroutines.BlockingCoroutine.joinBlocking(Builders.kt:85)
	at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking(Builders.kt:59)
	at kotlinx.coroutines.BuildersKt.runBlocking(Unknown Source)
	at kotlinx.coroutines.BuildersKt__BuildersKt.runBlocking$default(Builders.kt:38)
	at kotlinx.coroutines.BuildersKt.runBlocking$default(Unknown Source)
	at coroutine.basics.CancelKt.main(cancel.kt:5)
	at coroutine.basics.CancelKt.main(cancel.kt)

协程被取消成功,被取消的时候会抛出 CancellationException。之所以能被取消,是因为协程代码块里面有 delay() 这个 suspend 方法,如果我们把 delay() 换成非 suspend 方法 Thread.sleep(),能被取消成功吗?我们试验下:

fun main() = runBlocking 
    val job = launch(Dispatchers.Default) 
        try 
            println("coroutine started")
            Thread.sleep(1000) // 注意此处不再是 suspend 方法 delay()
            println("coroutine completed")
         catch (e: Exception) 
            e.printStackTrace()
         finally 
            println("coroutine finally")
        
    

    delay(500)

    println("coroutine canceled")
    job.cancel(CancellationException("i am canceled"))
    job.join()

    println("coroutine ended")

输出结果如下:

coroutine started
coroutine canceled
coroutine completed
coroutine finally
coroutine ended

可以看出,协程没有被取消。如果我们想不包含suspend方法的协程被取消,可以手动检查下当前协程的状态,如果 isActive 是 true 才往下执行,否则就结束掉:

fun main() = runBlocking 
    val job = launch(Dispatchers.Default) 
        try 
            println("coroutine started")
            Thread.sleep(1000)
            if (isActive) 
                println("coroutine completed")
            
         catch (e: Exception) 
            e.printStackTrace()
         finally 
            println("coroutine finally")
        
    

    delay(500)

    println("coroutine canceled")
    job.cancel(CancellationException("i am canceled"))
    job.join()

    println("coroutine ended")

输出如下:

coroutine started
coroutine canceled
coroutine finally
coroutine ended

coroutine started
coroutine canceled
coroutine completed
coroutine finally
coroutine ended

需要注意的是,如果协程已经被取消了,在 finally 代码里面使用 suspend 方法是不会执行的:

import kotlinx.coroutines.*

fun main() = runBlocking 
    val job = launch(Dispatchers.Default) 
        try 
            println("coroutine started")
            delay(1000)
         catch (e: Exception) 
            e.printStackTrace()
         finally 
            println("running finally")
            delay(1000)
            println("coroutine finally")
        
    

    delay(500)

    println("coroutine canceled")
    job.cancelAndJoin()
    println("coroutine ended")

输出如下:

coroutine started
coroutine canceled
running finally
coroutine ended
kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job=StandaloneCoroutineCancelling@7ebf7c43

可以看出对应的这两行代码都没有执行:

            delay(1000)
            println("coroutine finally")

如果我们想让上述两行代码执行,可以使用 withContext(NonCancellable)

package coroutine.basics

import kotlinx.coroutines.*

fun main() = runBlocking 
    val job = launch(Dispatchers.Default) 
        try 
            println("coroutine started")
            delay(1000)
         catch (e: Exception) 
            e.printStackTrace()
         finally 
            withContext(NonCancellable) 
                println("running finally")
                delay(1000)
                println("coroutine finally")
            
        
    

    delay(500)

    println("coroutine canceled")
    job.cancelAndJoin()

    println("coroutine ended")

上述代码输出如下:

coroutine started
coroutine canceled
running finally
kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job=StandaloneCoroutineCancelling@70c1cc32
coroutine finally
coroutine ended

withTimeout

除了 cancel() 方法,我们还可以使用 withTimeout() 来取消协程:

fun main() = runBlocking 
    withTimeout(1300L) 
        repeat(1000)  i ->
            println("i am sleeping $i")
            delay(500)
        
    

在过了 1300ms 后就会取消正在运行的协程,上述代码输出如下:

i am sleeping 0
i am sleeping 1
i am sleeping 2
Exception in thread "main" kotlinx.coroutines.TimeoutCancellationException: Timed out waiting for 1300 ms
	at kotlinx.coroutines.TimeoutKt.TimeoutCancellationException(Timeout.kt:186)
	at kotlinx.coroutines.TimeoutCoroutine.run(Timeout.kt:156)
	at kotlinx.coroutines.EventLoopImplBase$DelayedRunnableTask.run(EventLoop.common.kt:497)
	at kotlinx.coroutines.EventLoopImplBase.processNextEvent(EventLoop.common.kt:274)
	at kotlinx.coroutines.DefaultExecutor.run(DefaultExecutor.kt:69)
	at java.base/java.lang.Thread.run(Thread.java:831)

withTimeout() 通过抛出 TimeoutCancellationException 的方式来结束协程。TimeoutCancellationException 是 CancellationException 的子类。如果我们想在结束时做些额外的操作比如关闭资源等,可以使用 try-catch(e: TimeoutCancellationException) 。或者使用 withTimeoutOrNull,该方法返回null而非抛出异常:

fun main() = runBlocking 
    val ret = withTimeoutOrNull(1300L) 
        repeat(1000)  i ->
            println("i am sleeping $i")
            delay(500)
        
    

    println("Result is $ret")

上述代码输出为:

i am sleeping 0
i am sleeping 1
i am sleeping 2
Result is null

需要注意的是,withTimeout 方法是异步的,是和其包裹着的代码块并发执行的,执行的时机可能是任何时候,包括代码块正在或已经执行完成 return 语句等。正因为此,我们在代码块里面打开或者获取需要关闭的资源时,一定要格外小心。比如我们有一个 Resource 类,来监控被打开和关闭的次数,我们用10w个协程模拟对该资源的打开和关闭操作:

var acquired = 0

class Resource 
    init 
        acquired++
    

    fun close() 
        acquired--
    


fun main() 
    runBlocking 
        repeat(100_000) 
            launch 
                val resource = withTimeout(20) 
                    delay(19)
                    Resource()
                

                resource.close()
            
        
    
    println(acquired)

根据执行环境不同,可以适当调整 20 和 19的数值,我们会发现多执行几次后打印的结果并不总是0。注意,上述代码并不会出现线程安全的问题,因为10w的协程都在是主线程中执行的。那么我们怎么避免这个问题呢?可以存储一个资源的引用,然后使用这个引用释放资源:

fun main() 
    runBlocking 
        repeat(1000_000) 
            launch 
                var resource: Resource? = null
                try 
                    withTimeout(20) 
                        delay(19)
                        resource = Resource()
                    
                 finally 
                    resource?.close()
                
            
        
    

    println(acquired)

这样每次输出都是0了,不会出现资源泄露的情况。

参考文章

以上是关于对比Java学Kotlin协程-创建和取消的主要内容,如果未能解决你的问题,请参考以下文章

对比Java学Kotlin协程-创建和取消

对比Java学Kotlin协程-异步流

对比Java学Kotlin协程-异步流

对比Java学Kotlin协程-异步流

对比Java学Kotlin协程-异步流

对比Java学Kotlin协程简史