Kotlin 协程Flow 异步流 ⑥ ( 调用 Flow#launchIn 函数指定流收集协程 | 通过取消流收集所在的协程取消流 )

Posted 韩曙亮

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kotlin 协程Flow 异步流 ⑥ ( 调用 Flow#launchIn 函数指定流收集协程 | 通过取消流收集所在的协程取消流 )相关的知识,希望对你有一定的参考价值。

文章目录





一、调用 Flow#launchIn 函数指定流收集协程



1、指定流收集协程


响应式编程 , 是 基于事件驱动 的 , 在 Flow 流中会产生源源不断的事件 , 就是 发射元素操作 ;

拿到 Flow 流后 , 开始 收集元素 , 按照顺序逐个处理产生的事件 ( 元素 ) ;


调用 Flow#launchIn 函数 , 传入 协程作用域 作为参数 , 可以 指定 收集 Flow 流元素 的 协程 ;

在上一篇博客 【Kotlin 协程】Flow 异步流 ⑤ 中 , 调用 Flow#flowOn 函数 , 可以 指定 Flow 流发射元素 的 协程 ;


Flow#launchIn 函数返回值是 Job 对象 , 是 协程任务对象 , 可调用 Job#cancel 函数取消该协程任务 ;


2、Flow#launchIn 函数原型


Flow#launchIn 函数原型 :

/**
 * 终端流操作符,在[作用域]中[启动][启动]给定流的[收集][收集]。
 * 它是“范围”(scope)的简称。启动flow.collect() '。
 *
 * 此操作符通常与[onEach], [onCompletion]和[catch]操作符一起使用,以处理所有发出的值
 * 处理上游流或处理过程中可能发生的异常,例如:
 *
 * ```
 * flow
 *     .onEach  value -> updateUi(value) 
 *     .onCompletion  cause -> updateUi(if (cause == null) "Done" else "Failed") 
 *     .catch  cause -> LOG.error("Exception: $cause") 
 *     .launchIn(uiScope)
 * ```
 *
 * 注意,[launchIn]的结果值没有被使用,提供的作用域负责取消。
 */
public fun <T> Flow<T>.launchIn(scope: CoroutineScope): Job = scope.launch 
    collect() // tail-call


3、代码示例


代码示例 :

package kim.hsl.coroutine

import android.os.Bundle
import androidx.appcompat.app.AppCompatActivity
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

class MainActivity : AppCompatActivity() 
    override fun onCreate(savedInstanceState: Bundle?) 
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        // 携程中调用挂起函数返回一个 Flow 异步流
        runBlocking 
            println("流收集时的协程上下文 : $Thread.currentThread().name")

            flowEvent()
                .onEach 
                    // 逐个处理产生的事件
                    println("接收到事件 : $it, 当前线程 : $Thread.currentThread().name")
                
                .launchIn(CoroutineScope(Dispatchers.IO)) // 在指定的协程作用域中处理收集元素操作,
                                                          // 该 launchIn 函数返回一个 Job 对象
                .join()     // 该协程不是 runBlocking 主协程 的子协程, 需要调用 join 等待协程执行完毕
        
    

    /**
     * 使用 flow 构建器 Flow 异步流
     * 产生事件的 事件源
     */
    suspend fun flowEvent() = (0..3)
        .asFlow()                               // 将区间转为 Flow 流
        .onEach 
            delay(500)
            println("发射事件 : $it, 当前线程 : $Thread.currentThread().name")
              // 发射元素 ( 产生事件 ) 时挂起 500ms
        .flowOn(Dispatchers.Default)           // 设置发射元素的协程

执行结果 :

2022-12-23 16:06:58.720 2950-2950/kim.hsl.coroutine I/System.out: 流收集时的协程上下文 : main
2022-12-23 16:06:59.345 2950-3080/kim.hsl.coroutine I/System.out: 发射事件 : 0, 当前线程 : DefaultDispatcher-worker-3
2022-12-23 16:06:59.347 2950-3078/kim.hsl.coroutine I/System.out: 接收到事件 : 0, 当前线程 : DefaultDispatcher-worker-1
2022-12-23 16:06:59.885 2950-3078/kim.hsl.coroutine I/System.out: 发射事件 : 1, 当前线程 : DefaultDispatcher-worker-1
2022-12-23 16:06:59.887 2950-3079/kim.hsl.coroutine I/System.out: 接收到事件 : 1, 当前线程 : DefaultDispatcher-worker-2
2022-12-23 16:07:00.394 2950-3080/kim.hsl.coroutine I/System.out: 发射事件 : 2, 当前线程 : DefaultDispatcher-worker-3
2022-12-23 16:07:00.396 2950-3080/kim.hsl.coroutine I/System.out: 接收到事件 : 2, 当前线程 : DefaultDispatcher-worker-3
2022-12-23 16:07:00.938 2950-3078/kim.hsl.coroutine I/System.out: 发射事件 : 3, 当前线程 : DefaultDispatcher-worker-1
2022-12-23 16:07:00.940 2950-3079/kim.hsl.coroutine I/System.out: 接收到事件 : 3, 当前线程 : DefaultDispatcher-worker-2





二、通过取消流收集所在的协程取消流



Flow 流的 收集元素 操作 , 是在协程中执行 , 将 协程 取消 , 即可将 Flow 流收集操作 取消 , 也就是 将 Flow 流取消 ;


代码示例 : 使用 withTimeoutOrNull(2000) 创建一个协程 , 该协程在 2000ms 后自动超时取消 , 同时在其中进行 流收集 的操作也一并取消 ;

package kim.hsl.coroutine

import android.os.Bundle
import androidx.appcompat.app.AppCompatActivity
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

class MainActivity : AppCompatActivity() 
    override fun onCreate(savedInstanceState: Bundle?) 
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        // 携程中调用挂起函数返回一个 Flow 异步流
        runBlocking 
            // 该协程作用域 2 秒后超时取消
            withTimeoutOrNull(2000)
                flowEvent().collect 
                    println("收集到元素 : $it")
                
            
            println("协程作用域取消")
        
    

    /**
     * 使用 flow 构建器 Flow 异步流
     * 产生事件的 事件源
     */
    suspend fun flowEvent() = (0..10)
        .asFlow()                               // 将区间转为 Flow 流
        .onEach 
            delay(500)
            println("发射事件 : $it, 当前线程 : $Thread.currentThread().name")
              // 发射元素 ( 产生事件 ) 时挂起 500ms
        .flowOn(Dispatchers.Default)           // 设置发射元素的协程

执行结果 :

2022-12-23 16:37:02.915 9585-9647/kim.hsl.coroutine I/System.out: 发射事件 : 0, 当前线程 : DefaultDispatcher-worker-1
2022-12-23 16:37:02.917 9585-9585/kim.hsl.coroutine I/System.out: 收集到元素 : 0
2022-12-23 16:37:03.429 9585-9647/kim.hsl.coroutine I/System.out: 发射事件 : 1, 当前线程 : DefaultDispatcher-worker-1
2022-12-23 16:37:03.431 9585-9585/kim.hsl.coroutine I/System.out: 收集到元素 : 1
2022-12-23 16:37:03.932 9585-9647/kim.hsl.coroutine I/System.out: 发射事件 : 2, 当前线程 : DefaultDispatcher-worker-1
2022-12-23 16:37:03.933 9585-9585/kim.hsl.coroutine I/System.out: 收集到元素 : 2
2022-12-23 16:37:04.327 9585-9585/kim.hsl.coroutine I/System.out: 协程作用域取消

以上是关于Kotlin 协程Flow 异步流 ⑥ ( 调用 Flow#launchIn 函数指定流收集协程 | 通过取消流收集所在的协程取消流 )的主要内容,如果未能解决你的问题,请参考以下文章

Kotlin 协程Flow 异步流 ② ( 使用 Flow 异步流持续获取不同返回值 | Flow 异步流获取返回值方式与其它方式对比 | 在 Android 中使用 Flow 异步流下载文件 )

Kotlin 协程Flow 异步流 ② ( 使用 Flow 异步流持续获取不同返回值 | Flow 异步流获取返回值方式与其它方式对比 | 在 Android 中使用 Flow 异步流下载文件 )

Kotlin 协程Flow 异步流 ⑦ ( 调用 FlowCollector#emit 发射元素时自动执行 Flow 流的取消检测 | 启用检测 Flow 流的取消cancellable函数 )

Kotlin 协程Flow 异步流 ⑦ ( 调用 FlowCollector#emit 发射元素时自动执行 Flow 流的取消检测 | 启用检测 Flow 流的取消cancellable函数 )

Kotlin 协程Flow 异步流 ③ ( 冷流 | 流被收集时运行 | 流的连续性 )

Kotlin 协程Flow 异步流 ③ ( 冷流 | 流被收集时运行 | 流的连续性 )