Kotlin 协程协程取消 ② ( CPU 密集型协程任务取消 | 使用 isActive 判定协程状态 | 使用 ensureActive 函数取消协程 | 使用 yield 函数取消协程 )

Posted 韩曙亮

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kotlin 协程协程取消 ② ( CPU 密集型协程任务取消 | 使用 isActive 判定协程状态 | 使用 ensureActive 函数取消协程 | 使用 yield 函数取消协程 )相关的知识,希望对你有一定的参考价值。

文章目录





一、CPU 密集型协程任务取消



在 协程中 , 定义在 kotlinx.coroutines 包下的 suspend 挂起函数 是可以取消的 ;

但是有一种协程任务 , CPU 密集型协程任务 , 是无法 直接取消的 ; 此类任务一直在 抢占 CPU 资源 , 使用 cancel 函数 , 无法取消该类型的 协程任务 ;

在进行 CPU 密集计算时 , 中间会有大量的中间数据 , 如果中途取消 , 大量的临时数据会丢失 , 因此在协程中 , 无法直接取消 CPU 密集型协程任务 , 这是对协程的保护措施 ;


CPU 密集型协程任务取消示例 : 在下面的 协程任务 中 , 循环 10000000 次 , 100 ms 后取消 ;

package kim.hsl.coroutine

import android.os.Bundle
import android.util.Log
import androidx.appcompat.app.AppCompatActivity
import kotlinx.coroutines.*

class MainActivity : AppCompatActivity()
    val TAG = "MainActivity"

    override fun onCreate(savedInstanceState: Bundle?) 
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        runBlocking 
            // 创建协程作用域
            val coroutineScope = CoroutineScope(Dispatchers.Default)

            val job1 = coroutineScope.launch 
                Log.i(TAG, "协程任务执行开始")
                var i = 0
                while (i < 10000000) 
                    var j = i + 1
                    i++
                    if(j == 10000000) 
                        Log.i(TAG, "最后一次循环 : j = $j")
                        Log.i(TAG, "协程任务执行完毕")
                    
                
            

            // 100ms 后取消协程作用域
            delay(10)

            Log.i(TAG, "取消协程任务")
            // 取消协程任务
            job1.cancelAndJoin()
            Log.i(TAG, "退出协程作用域")
        
    

执行结果 : 在执行协程任务过程中 , 取消协程 , 但是没有取消成功 , 协程自动执行完毕 ;

18:45:33.896  I  协程任务执行开始
18:45:33.906  I  取消协程任务
18:45:33.997  I  最后一次循环 : j = 10000000
18:45:33.997  I  协程任务执行完毕
18:45:34.001  I  退出协程作用域





二、使用 isActive 判定当前 CPU 密集型协程任务是否取消



协程 处于 活跃 Active 状态 时 , 当调用 Job#cancel 函数取消协程时 , 当前的任务会变为 取消中 Cancelling 状态 ,

取消中 Cancelling 状态 通过 ( isActive == false && isCancelled == true ) 可以进行判定 ;

当所有的子协程执行完毕会后 , 协程会进入 已取消 Cancelled 状态 ,

已取消 Cancelled 状态 通过 ( isCompleted == true ) 进行判定 ;


如果 调用了 Job#cancel 函数 取消协程 , 此时的 isActive 值肯定为 false , 这里在 CPU 密集型协程任务 执行时 , 时刻调用 isActive 判定当前状态即可 ;

如 : 在下面的代码中 , 每次循环都判定一次 isActive 是否为 true , 如果为 false , 则终止循环 , 即终止协程 ;

val job1 = coroutineScope.launch 
    Log.i(TAG, "协程任务执行开始")
    var i = 0
    while (i < 10000000 && isActive) 
        var j = i + 1
        i++
        if(j == 10000000) 
            Log.i(TAG, "最后一次循环 : j = $j")
            Log.i(TAG, "协程任务执行完毕")
        
    


协程声明周期状态 参考 【Kotlin 协程】协程启动 ⑥ ( 协程生命周期状态 | 新创建 New | 活跃 Active | 完成中 Completing | 已完成 Completed | 取消中 | 已取消 )

代码示例 : 在下面的代码中 , 执行 CPU 密集型任务 , 循环 10000000 次进行运算 , 然后在每次循环时 , 都调用 isActive 判定当前的协程是否被取消 ;

package kim.hsl.coroutine

import android.os.Bundle
import android.util.Log
import androidx.appcompat.app.AppCompatActivity
import kotlinx.coroutines.*

class MainActivity : AppCompatActivity()
    val TAG = "MainActivity"

    override fun onCreate(savedInstanceState: Bundle?) 
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        runBlocking 
            // 创建协程作用域
            val coroutineScope = CoroutineScope(Dispatchers.Default)

            val job1 = coroutineScope.launch 
                Log.i(TAG, "协程任务执行开始")
                var i = 0
                while (i < 10000000 && isActive) 
                    var j = i + 1
                    i++
                    if(j == 10000000) 
                        Log.i(TAG, "最后一次循环 : j = $j")
                        Log.i(TAG, "协程任务执行完毕")
                    
                
            

            // 100ms 后取消协程作用域
            delay(10)

            Log.i(TAG, "取消协程任务")
            // 取消协程任务
            job1.cancelAndJoin()
            Log.i(TAG, "退出协程作用域")
        
    

执行结果 :

19:44:23.632  I  协程任务执行开始
19:44:23.675  I  取消协程任务
19:44:23.680  I  退出协程作用域





三、使用 ensureActive 自动处理协程退出



在协程中 , 可以执行 ensureActive() 函数 , 在该函数中会 自自动判定当前的 isActive 状态 , 如果当前处于取消中状态 , 自动抛出 CancellationException 异常 , 并退出协程 ;

/**
 * 确保当前作用域是[活动的][CoroutineScope.isActive]。
 *
 * 如果作业不再活动,则抛出[CancellationException]。
 * 如果作业被取消,则抛出异常包含原始的取消原因。
 * 如果作用域的[coroutineContext][CoroutineScope.coroutineContext]中没有[Job],则此函数不做任何事情。
 *
 * 这个方法可以替代以下代码,但有更精确的例外:
 * ```
 * if (!isActive) 
 *     throw CancellationException()
 * 
 * ```
 *
 * @see CoroutineContext.ensureActive
 */
public fun CoroutineScope.ensureActive(): Unit = coroutineContext.ensureActive()

其真实操作如下 :

public fun Job.ensureActive(): Unit 
    if (!isActive) throw getCancellationException()

核心代码示例 : 协程中执行的循环任务 , 每次循环时 , 都调用一次 ensureActive() 函数 , 判断当前协程是否已经取消 , 如果已经取消则抛出异常 , 退出协程 ;

val job1 = coroutineScope.launch 
    Log.i(TAG, "协程任务执行开始")
    var i = 0
    while (i < 10000000) 
        ensureActive()
        var j = i + 1
        i++
    


完整代码示例 :

package kim.hsl.coroutine

import android.os.Bundle
import android.util.Log
import androidx.appcompat.app.AppCompatActivity
import kotlinx.coroutines.*
import kotlin.coroutines.CoroutineContext

class MainActivity : AppCompatActivity()
    val TAG = "MainActivity"

    override fun onCreate(savedInstanceState: Bundle?) 
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        runBlocking 
            // 创建协程作用域
            val coroutineScope = CoroutineScope(Dispatchers.Default)

            val job1 = coroutineScope.launch 
                Log.i(TAG, "协程任务执行开始")
                var i = 0
                while (i < 10000000) 
                    ensureActive()
                    var j = i + 1
                    i++
                    if(j == 10000000) 
                        Log.i(TAG, "最后一次循环 : j = $j")
                        Log.i(TAG, "协程任务执行完毕")
                    
                
            

            // 100ms 后取消协程作用域
            delay(10)

            Log.i(TAG, "取消协程任务")
            // 取消协程任务
            job1.cancelAndJoin()
            Log.i(TAG, "退出协程作用域")
        
    

执行结果 :

19:44:23.632  I  协程任务执行开始
19:44:23.675  I  取消协程任务
19:44:23.680  I  退出协程作用域





四、使用 yield 函数检查协程状态并处理协程取消操作



在协程中 , 可以使用 yield() 函数 , 检查当前协程的状态 , 如果已经调用 cancel() 函数取消协程 , 则抛出 CancellationException 异常 , 取消协程 ;

yield() 函数 比 ensureActive 函数 更加复杂 , 该函数还尝试出让线程执行权 , 将执行权让给别的协程执行 ; yield() 函数 会在每次循环时 , 都执行一次 , 每次循环时都执行该函数的时候 , 此时会尝试出让线程的执行权 , 看看是否有其它更紧急的协程需要执行 , 如果有 , 则让其它协程先执行 ;

yield() 函数 每次执行前都问一下其它协程 , 你们需要执行吗 , 如果需要先让你们执行一次 ;

这样可以避免 协程的 CPU 占用太密集 , 导致其它协程无法执行 的情况 ;


yield() 函数原型 :

/**
 * 生成当前协程分配器的线程(或线程池)
 * 到同一调度程序上运行的其他协程。
 *
 * 这个暂停功能是可以取消的。
 * 如果在调用此挂起函数时取消或完成当前协程的[Job]
 * 这个函数正在等待调度,它会以[CancellationException]恢复。
 * 有**立即取消的保证**。如果在此函数被取消时作业被取消
 * 挂起后,它将无法成功恢复。有关底层细节,请参阅[suspendCancellableCoroutine]文档。
 *
 * **注意**:这个函数总是[检查取消][ensureActive],即使它没有挂起。
 *
 * ###实现细节
 *
 * 如果协程调度程序为[unrestricted][Dispatchers.]无侧限),这
 * 函数仅在有其他无限制协程工作并形成事件循环时才挂起。
 * 对于其他调度程序,该函数调用[CoroutineDispatcher]。调度),
 * 无论[CoroutineDispatcher.isDispatchNeeded]的结果如何,总是挂起以便稍后恢复。
 * 如果上下文中没有[CoroutineDispatcher],它就不会挂起。
 */
public suspend fun yield(): Unit = suspendCoroutineUninterceptedOrReturn sc@  uCont ->
    val context = uCont.context
    context.ensureActive()
    val cont = uCont.intercepted() as? DispatchedContinuation<Unit> ?: return@sc Unit
    if (cont.dispatcher.isDispatchNeeded(context)) 
        // 这是一个常规的分派器——执行简单的dispatchYield
        cont.dispatchYield(context, Unit)
     else 
        // 这要么是“即时”调度程序,要么是无限制调度程序
        // 此代码检测unrestricted调度程序,即使它被包装到另一个调度程序中
        val yieldContext = YieldContext()
        cont.dispatchYield(context + yieldContext, Unit)
        // 仅能在已有的无约束循环中屈服的无约束调度程序的特殊情况
        if (yieldContext.dispatcherWasUnconfined) 
            // 说明无限制调度员接到了电话,但什么都没做。
            // 参见“无限制”代码。调度”功能。
            return@sc if (cont.yieldUndispatched()) COROUTINE_SUSPENDED else Unit
        
        // 否则,就是其他调度程序成功地调度了协程
    
    COROUTINE_SUSPENDED


完整代码示例 :

package kim.hsl.coroutine

import android.os.Bundle
import android.util.Log
import androidx.appcompat.app.AppCompatActivity
import kotlinx.coroutines.*
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.intrinsics.COROUTINE_SUSPENDED
import kotlin.coroutines.intrinsics.intercepted
import kotlin.coroutines.intrinsics.suspendCoroutineUninterceptedOrReturn

class MainActivity : AppCompatActivity()
    val TAG = "MainActivity"

    override fun onCreate(savedInstanceState: Bundle?) 
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        runBlocking 
            // 创建协程作用域
            val coroutineScope = CoroutineScope(Dispatchers.Default)

            val job1 = coroutineScope.launch 
                Log.i(TAG, "协程任务执行开始")
                var i = 0
                while (i < 10000000) 
                    yield()
                    var j = i + 1
                    i++
                    if(j == 10000000) 
                        Log.i(TAG, "最后一次循环 : j = $j")
                        Log.i(TAG, "协程任务执行完毕")
                    
                
            

            // 100ms 后取消协程作用域
            delay(10)

            Log.i(TAG, "取消协程任务")
            // 取消协程任务
            job1.cancelAndJoin()
            Log.i(TAG, "退出协程作用域")
        
    

执行结果 :

20:20:59.008  I  协程任务执行开始
20:20:59.055  I  取消协程任务
20:20:59.059  I  退出协程作用域

以上是关于Kotlin 协程协程取消 ② ( CPU 密集型协程任务取消 | 使用 isActive 判定协程状态 | 使用 ensureActive 函数取消协程 | 使用 yield 函数取消协程 )的主要内容,如果未能解决你的问题,请参考以下文章

Kotlin 协程协程异常处理 ② ( SupervisorJob 协程 | supervisorScope 协程作用域构建器函数 )

Kotlin 协程协程异常处理 ② ( SupervisorJob 协程 | supervisorScope 协程作用域构建器函数 )

Kotlin 协程协程的挂起和恢复 ② ( 协程挂起 和 线程阻塞 对比 )

Kotlin 协程协程的挂起和恢复 ② ( 协程挂起 和 线程阻塞 对比 )

Kotlin 协程协程底层实现 ② ( 协程调度器 | 协程任务泄漏 | 结构化并发 )

Kotlin 协程协程底层实现 ② ( 协程调度器 | 协程任务泄漏 | 结构化并发 )