Kotlin 协程Flow 异步流 ⑦ ( 调用 FlowCollector#emit 发射元素时自动执行 Flow 流的取消检测 | 启用检测 Flow 流的取消cancellable函数 )

Posted 韩曙亮

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kotlin 协程Flow 异步流 ⑦ ( 调用 FlowCollector#emit 发射元素时自动执行 Flow 流的取消检测 | 启用检测 Flow 流的取消cancellable函数 )相关的知识,希望对你有一定的参考价值。

文章目录





一、调用 FlowCollector#emit 发射元素时自动执行 Flow 流的取消检测



Flow 流构建器 中 , 每次 调用 FlowCollector#emit 发射元素时 ,

都会执行一个 ensureActive 检测 , 检测当前的流是否取消 ,

因此 , 在 flow 流构建器 中 , 循环执行的 FlowCollector#emit 发射操作 , 是可以取消的 ;


在 Flow#collect 代码块中 , 执行 Job#cancel 函数 , 即可 取消该流收集操作所在的协程 , 进而取消了流 ;

/**
 * 用一个可选的cancel [cause]取消这个作用域,包括它的作业和它的所有子任务。
 * 原因可用于指定错误消息或提供有关的其他详细信息
 * 用于调试目的的取消原因。
 * 如果作用域中没有作业,则抛出[IllegalStateException]。
 */
public fun CoroutineScope.cancel(cause: CancellationException? = null) 
    val job = coroutineContext[Job] ?: error("Scope cannot be cancelled because it does not have a job: $this")
    job.cancel(cause)


代码示例 : 在收集元素时 , 收集几个元素后 , 执行 Flow#cancel 函数 , 取消流收集所在协程 ;

package kim.hsl.coroutine

import android.os.Bundle
import androidx.appcompat.app.AppCompatActivity
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

class MainActivity : AppCompatActivity() 
    override fun onCreate(savedInstanceState: Bundle?) 
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        // 携程中调用挂起函数返回一个 Flow 异步流
        runBlocking 
            flowEvent().collect
                println("收集元素 $it")

                if (it == 2) 
                    // 收集到元素 2 时, 取消流
                    // 在流中 emit 发射 3 时, 就会自动爆出异常, 停止后续操作
                    cancel()
                
            
        
    

    /**
     * 使用 flow 构建器 Flow 异步流
     */
    suspend fun flowEvent() = flow<Int> 

        for(i in 0..5) 
            delay(1000)
            emit(i)
            println("发射元素 $i")
        
    

执行结果 :

2022-12-23 18:16:41.610 29409-29409/kim.hsl.coroutine I/System.out: 收集元素 0
2022-12-23 18:16:41.611 29409-29409/kim.hsl.coroutine I/System.out: 发射元素 0
2022-12-23 18:16:42.614 29409-29409/kim.hsl.coroutine I/System.out: 收集元素 1
2022-12-23 18:16:42.614 29409-29409/kim.hsl.coroutine I/System.out: 发射元素 1
2022-12-23 18:16:43.655 29409-29409/kim.hsl.coroutine I/System.out: 收集元素 2
2022-12-23 18:16:43.658 29409-29409/kim.hsl.coroutine I/System.out: 发射元素 2
2022-12-23 18:16:43.661 29409-29409/kim.hsl.coroutine D/AndroidRuntime: Shutting down VM
    
    
    --------- beginning of crash
2022-12-23 18:16:43.665 29409-29409/kim.hsl.coroutine E/AndroidRuntime: FATAL EXCEPTION: main
    Process: kim.hsl.coroutine, PID: 29409
    java.lang.RuntimeException: Unable to start activity ComponentInfokim.hsl.coroutine/kim.hsl.coroutine.MainActivity: kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job=BlockingCoroutineCancelled@daf39f2
        at android.app.ActivityThread.performLaunchActivity(ActivityThread.java:2951)
        at android.app.ActivityThread.handleLaunchActivity(ActivityThread.java:3086)
        at android.app.servertransaction.LaunchActivityItem.execute(LaunchActivityItem.java:78)
        at android.app.servertransaction.TransactionExecutor.executeCallbacks(TransactionExecutor.java:108)
        at android.app.servertransaction.TransactionExecutor.execute(TransactionExecutor.java:68)
        at android.app.ActivityThread$H.handleMessage(ActivityThread.java:1816)
        at android.os.Handler.dispatchMessage(Handler.java:106)
        at android.os.Looper.loop(Looper.java:193)
        at android.app.ActivityThread.main(ActivityThread.java:6718)
        at java.lang.reflect.Method.invoke(Native Method)
        at com.android.internal.os.RuntimeInit$MethodAndArgsCaller.run(RuntimeInit.java:493)
        at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:858)
     Caused by: kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job=BlockingCoroutineCancelled@daf39f2
2022-12-23 18:16:43.695 29409-29409/kim.hsl.coroutine I/Process: Sending signal. PID: 29409 SIG: 9





二、调用 Flow#cancellable() 函数启用检测 Flow 流的取消



在 Flow 流中 , 除 FlowCollector#emit 发射元素 之外 ,

还有很多其它的 流操作 , 这些操作不会 自动执行 ensureActive 检测 ,

因此这里需要我们 手动 进行 流取消检测 ;


调用 Flow#cancellable() 函数 , 可以手动设置流取消检测 ;


1、流取消失败代码示例


代码示例 :

package kim.hsl.coroutine

import android.os.Bundle
import androidx.appcompat.app.AppCompatActivity
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

class MainActivity : AppCompatActivity() 
    override fun onCreate(savedInstanceState: Bundle?) 
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        // 携程中调用挂起函数返回一个 Flow 异步流
        runBlocking 

            (0..5).asFlow().collect 
                println("收集到元素 $it")

                // 收集到元素 2 时, 协程退出
                if (it == 2) 
                    cancel()
                
            

        
    

执行结果 :

2022-12-23 18:24:01.821 30105-30105/kim.hsl.coroutine I/System.out: 收集到元素 0
2022-12-23 18:24:01.822 30105-30105/kim.hsl.coroutine I/System.out: 收集到元素 1
2022-12-23 18:24:01.822 30105-30105/kim.hsl.coroutine I/System.out: 收集到元素 2
2022-12-23 18:24:01.827 30105-30105/kim.hsl.coroutine I/System.out: 收集到元素 3
2022-12-23 18:24:01.827 30105-30105/kim.hsl.coroutine I/System.out: 收集到元素 4
2022-12-23 18:24:01.827 30105-30105/kim.hsl.coroutine I/System.out: 收集到元素 5
2022-12-23 18:24:01.829 30105-30105/kim.hsl.coroutine D/AndroidRuntime: Shutting down VM
    
    
    --------- beginning of crash
2022-12-23 18:24:01.832 30105-30105/kim.hsl.coroutine E/AndroidRuntime: FATAL EXCEPTION: main
    Process: kim.hsl.coroutine, PID: 30105
    java.lang.RuntimeException: Unable to start activity ComponentInfokim.hsl.coroutine/kim.hsl.coroutine.MainActivity: kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job=BlockingCoroutineCancelled@daf39f2
        at android.app.ActivityThread.performLaunchActivity(ActivityThread.java:2951)
        at android.app.ActivityThread.handleLaunchActivity(ActivityThread.java:3086)
        at android.app.servertransaction.LaunchActivityItem.execute(LaunchActivityItem.java:78)
        at android.app.servertransaction.TransactionExecutor.executeCallbacks(TransactionExecutor.java:108)
        at android.app.servertransaction.TransactionExecutor.execute(TransactionExecutor.java:68)
        at android.app.ActivityThread$H.handleMessage(ActivityThread.java:1816)
        at android.os.Handler.dispatchMessage(Handler.java:106)
        at android.os.Looper.loop(Looper.java:193)
        at android.app.ActivityThread.main(ActivityThread.java:6718)
        at java.lang.reflect.Method.invoke(Native Method)
        at com.android.internal.os.RuntimeInit$MethodAndArgsCaller.run(RuntimeInit.java:493)
        at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:858)
     Caused by: kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job=BlockingCoroutineCancelled@daf39f2
2022-12-23 18:24:01.856 30105-30105/? I/Process: Sending signal. PID: 30105 SIG: 9


2、启用检测 Flow 流的取消代码示例


代码示例 :

package kim.hsl.coroutine

import android.os.Bundle
import androidx.appcompat.app.AppCompatActivity
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

class MainActivity : AppCompatActivity() 
    override fun onCreate(savedInstanceState: Bundle?) 
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        // 携程中调用挂起函数返回一个 Flow 异步流
        runBlocking 

            // 执行 Flow#cancellable 启用手动执行流取消检测
            (0..5).asFlow().cancellable().collect 
                println("收集到元素 $it")

                // 收集到元素 2 时, 协程退出
                if (it == 2) 
                    cancel()
                
            
        
    

执行结果 :

2022-12-23 18:28:40.139 31502-31502/kim.hsl.coroutine I/System.out: 收集到元素 0
2022-12-23 18:28:40.140 31502-31502/kim.hsl.coroutine I/System.out: 收集到元素 1
2022-12-23 18:28:40.140 31502-31502/kim.hsl.coroutine I/System.out: 收集到元素 2
2022-12-23 18:28:40.145 31502-31502/kim.hsl.coroutine D/AndroidRuntime: Shutting down VM
2022-12-23 18:28:40.149 31502-31502/kim.hsl.coroutine E/AndroidRuntime: FATAL EXCEPTION: main
    Process: kim.hsl.coroutine, PID: 31502
    java.lang.RuntimeException: Unable to start activity ComponentInfokim.hsl.coroutine/kim.hsl.coroutine.MainActivity: kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job=BlockingCoroutineCancelled@daf39f2
        at android.app.ActivityThread.performLaunchActivity(ActivityThread.java:2951)
        at android.app.ActivityThread.handleLaunchActivity(ActivityThread.java:3086)
        at android.app.servertransaction.LaunchActivityItem.execute(LaunchActivityItem.java:78)
        at android.app.servertransaction.TransactionExecutor.executeCallbacks(TransactionExecutor.java:108)
        at android.app.servertransaction.TransactionExecutor.execute(TransactionExecutor.java:68)
        at android.app.ActivityThread$H.handleMessage(ActivityThread.java:1816)
        at android.os.Handler.dispatchMessage(Handler.java:106)
        at android.os.Looper.loop(Looper.java:193)
        at android.app.ActivityThread.main(ActivityThread.java:6718)
        at java.lang.reflect.Method.invoke(Native Method)
        at com.android.internal.os.RuntimeInit$MethodAndArgsCaller.run(RuntimeInit.java:493)
        at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:858)
     Caused by: kotlinx.coroutines.JobCancellationException: BlockingCoroutine was cancelled; job=BlockingCoroutineCancelled@daf39f2
2022-12-23 18:28:40.169 31502-31502/? I/Process: Sending signal. PID: 31502 SIG: 9

以上是关于Kotlin 协程Flow 异步流 ⑦ ( 调用 FlowCollector#emit 发射元素时自动执行 Flow 流的取消检测 | 启用检测 Flow 流的取消cancellable函数 )的主要内容,如果未能解决你的问题,请参考以下文章

Kotlin 协程Flow 异步流 ⑥ ( 调用 Flow#launchIn 函数指定流收集协程 | 通过取消流收集所在的协程取消流 )

Kotlin 协程Flow 异步流 ⑥ ( 调用 Flow#launchIn 函数指定流收集协程 | 通过取消流收集所在的协程取消流 )

Kotlin 协程Flow 异步流 ② ( 使用 Flow 异步流持续获取不同返回值 | Flow 异步流获取返回值方式与其它方式对比 | 在 Android 中使用 Flow 异步流下载文件 )

Kotlin 协程Flow 异步流 ② ( 使用 Flow 异步流持续获取不同返回值 | Flow 异步流获取返回值方式与其它方式对比 | 在 Android 中使用 Flow 异步流下载文件 )

Kotlin 协程Flow 异步流 ③ ( 冷流 | 流被收集时运行 | 流的连续性 )

Kotlin 协程Flow 异步流 ③ ( 冷流 | 流被收集时运行 | 流的连续性 )