Kotlin 协程Flow 流展平 ( 连接模式 flatMapConcat | 合并模式 flatMapMerge | 最新展平模式 flatMapLatest )

Posted 韩曙亮

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kotlin 协程Flow 流展平 ( 连接模式 flatMapConcat | 合并模式 flatMapMerge | 最新展平模式 flatMapLatest )相关的知识,希望对你有一定的参考价值。

文章目录





一、Flow 流展平



Flow 流在 接收元素 时 , 可能需要 另一个 流的元素 , 两个流之间进行 交互的操作 就是 展平 , 常见的 展平模式有 :

  • 连接模式 flatMapConcat : m 个元素的流 与 n 个元素的流 连接后 , 元素个数为 m x n 个 ;
  • 合并模式 flatMapMerge : m 个元素的流 与 n 个元素的流 合并后 , 元素个数为 n x m 个 ;
  • 最新展平模式 flatMapLatest : 前面的看时间间隔进行结合 , 中间的可能跳过某些元素 , 不要中间值 , 只重视最新的数据 ;

1、连接模式 flatMapConcat 代码示例


连接模式 flatMapConcat : m 个元素的流 与 n 个元素的流 连接后 , 元素个数为 m x n 个 ;

flatMapConcat 函数原型 :

/**
 * 通过应用[transform]转换原始流发出的元素,它返回另一个流,
 * 然后连接并压平这些流。
 *
 * 该方法是' map(transform).flattenConcat() '的快捷方式。看到[flattenConcat]。
 *
 * 请注意,尽管这个操作符看起来非常熟悉,但我们不鼓励在常规的特定于应用程序的流中使用它。
 * 最有可能的是,暂停[map]操作符中的操作就足够了,线性转换更容易推理。
 */
@FlowPreview
public fun <T, R> Flow<T>.flatMapConcat(transform: suspend (value: T) -> Flow<R>): Flow<R> =
    map(transform).flattenConcat()

调用 FlowA.flatMapConcat(FlowB) 代码 , 先拿到 FlowA , 然后让 FlowA 每个元素 与 FlowB 进行连接 , 以 FlowA 的元素顺序为主导 ;


代码示例 : 注意 两个 流 连接后的间隔 , (0…2) 流之间的发射间隔 100ms , stringFlow 流元素发射间隔 200ms , 连接后的流要结合上述两个间隔 , 在 (0…2) 流 的元素之间间隔为 100ms , 在 (0…2) 流单个元素与所有的 stringFlow 流元素连接的间隔为 200ms ;

package kim.hsl.coroutine

import android.os.Bundle
import androidx.appcompat.app.AppCompatActivity
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

class MainActivity : AppCompatActivity() 
    override fun onCreate(savedInstanceState: Bundle?) 
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        runBlocking 
            val startTime = System.currentTimeMillis()
            (0..2).asFlow()
                        .onEach  delay(100) 
                        // 该 Flow 流与  stringFlow 进行连接
                        .flatMapConcat  stringFlow(it) 
                        .collect 
                            println("收集到元素 $it, 时间 $System.currentTimeMillis() - startTime")
                        
        
    

    suspend fun stringFlow(num: Int) = flow<String> 
        emit("$num flatMapContact Hello First")
        delay(200)
        emit("$num flatMapContact Hello Second")
    

执行结果 :

I/System.out: 收集到元素 0 flatMapContact Hello First, 时间 201
I/System.out: 收集到元素 0 flatMapContact Hello Second, 时间 448
I/System.out: 收集到元素 1 flatMapContact Hello First, 时间 608
I/System.out: 收集到元素 1 flatMapContact Hello Second, 时间 837
I/System.out: 收集到元素 2 flatMapContact Hello First, 时间 954
I/System.out: 收集到元素 2 flatMapContact Hello Second, 时间 1196


2、合并模式 flatMapMerge 代码示例


合并模式 flatMapMerge : m 个元素的流 与 n 个元素的流 合并后 , 元素个数为 n x m 个 ;

flatMapMerge 函数原型 :

/**
 * 通过应用[transform]转换原始流发出的元素,它返回另一个流,
 * 然后合并并压平这些气流。
 *
 * 此操作符按顺序调用[transform],然后将结果流与[concurrency]合并
 * 对并发收集流的数量的限制。
 * 它是' map(transform).flattenMerge(concurrency)'的快捷方式。
 * 详见[flattenMerge]。
 *
 * 请注意,尽管这个操作符看起来非常熟悉,但我们不鼓励在常规的特定于应用程序的流中使用它。
 * 最有可能的是,暂停[map]操作符中的操作就足够了,线性转换更容易推理。
 *
 * ###算子融合
 *
 * [flowOn]、[buffer]和[produceIn] __after_此操作符的应用被融合
 * 它是并发合并,因此只有一个正确配置的通道用于执行合并逻辑。
 *
 * @param并发控制运行中的流的数量,最多收集[concurrency]个流
 * 同时。默认情况下,它等于[DEFAULT_CONCURRENCY]。
 */
@FlowPreview
public fun <T, R> Flow<T>.flatMapMerge(
    concurrency: Int = DEFAULT_CONCURRENCY,
    transform: suspend (value: T) -> Flow<R>
): Flow<R> =
    map(transform).flattenMerge(concurrency)

调用 FlowA.flatMapMerge(FlowB) 代码 , 先拿到 FlowB , 然后让 FlowB 每个元素 与 FlowA 进行结合 , 以 FlowB 的元素顺序为主导 ;


代码示例 :

package kim.hsl.coroutine

import android.os.Bundle
import androidx.appcompat.app.AppCompatActivity
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

class MainActivity : AppCompatActivity() 
    override fun onCreate(savedInstanceState: Bundle?) 
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        runBlocking 
            val startTime = System.currentTimeMillis()
            (0..2).asFlow()
                        .onEach  delay(100) 
                        // 该 Flow 流与  stringFlow 进行合并
                        .flatMapMerge  stringFlow(it) 
                        .collect 
                            println("收集到元素 $it, 时间 $System.currentTimeMillis() - startTime")
                        
        
    

    suspend fun stringFlow(num: Int) = flow<String> 
        emit("$num flatMapMerge Hello First")
        delay(500)
        emit("$num flatMapMerge Hello Second")
    

执行结果 :

I/System.out: 收集到元素 0 flatMapMerge Hello First, 时间 192
I/System.out: 收集到元素 1 flatMapMerge Hello First, 时间 328
I/System.out: 收集到元素 2 flatMapMerge Hello First, 时间 451
I/System.out: 收集到元素 0 flatMapMerge Hello Second, 时间 698
I/System.out: 收集到元素 1 flatMapMerge Hello Second, 时间 866
I/System.out: 收集到元素 2 flatMapMerge Hello Second, 时间 993


3、最新展平模式 flatMapLatest 代码示例


最新展平模式 flatMapLatest : 前面的看时间间隔进行结合 , 中间的可能跳过某些元素 , 不要中间值 , 只重视最新的数据 ;

flatMapLatest 函数原型 :

/**
 * 返回一个流,每当原始流发出一个值时,该流切换到[transform]函数生成的新流。
 * 当原始流产生一个新值时,由' transform '块产生的前一个流将被取消。
 *
 * 例如,以下流程:
 * ```
 * flow 
 *     emit("a")
 *     delay(100)
 *     emit("b")
 * .flatMapLatest  value ->
 *     flow 
 *         emit(value)
 *         delay(200)
 *         emit(value + "_last")
 *     
 * 
 * ```
 * produces `a b b_last`
 *
 * 该操作符默认为[buffered][buffer],其输出缓冲区的大小可以通过应用后续的[buffer]操作符来改变。
 */
@ExperimentalCoroutinesApi
public inline fun <T, R> Flow<T>.flatMapLatest(@BuilderInference crossinline transform: suspend (value: T) -> Flow<R>): Flow<R> =
    transformLatest  emitAll(transform(it)) 

代码示例 :

package kim.hsl.coroutine

import android.os.Bundle
import androidx.appcompat.app.AppCompatActivity
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

class MainActivity : AppCompatActivity() 
    override fun onCreate(savedInstanceState: Bundle?) 
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        runBlocking 
            val startTime = System.currentTimeMillis()
            (0..2).asFlow()
                        .onEach  delay(100) 
                        // 该 Flow 流与  stringFlow 进行合并
                        .flatMapLatest  stringFlow(it) 
                        .collect 
                            println("收集到元素 $it, 时间 $System.currentTimeMillis() - startTime")
                        
        
    

    suspend fun stringFlow(num: Int) = flow<String> 
        emit("$num flatMapLatest Hello First")
        delay(500)
        emit("$num flatMapLatest Hello Second")
    

执行结果 :

I/System.out: 收集到元素 0 flatMapLatest Hello First, 时间 233
I/System.out: 收集到元素 1 flatMapLatest Hello First, 时间 381
I/System.out: 收集到元素 2 flatMapLatest Hello First, 时间 547
I/System.out: 收集到元素 2 flatMapLatest Hello Second, 时间 1079

以上是关于Kotlin 协程Flow 流展平 ( 连接模式 flatMapConcat | 合并模式 flatMapMerge | 最新展平模式 flatMapLatest )的主要内容,如果未能解决你的问题,请参考以下文章

Kotlin 协程Flow 异步流 ⑥ ( 调用 Flow#launchIn 函数指定流收集协程 | 通过取消流收集所在的协程取消流 )

Kotlin 协程Flow 异步流 ⑥ ( 调用 Flow#launchIn 函数指定流收集协程 | 通过取消流收集所在的协程取消流 )

Kotlin 协程Flow 异步流 ④ ( 流的构建器函数 | flow 构建器函数 | flowOf 构建器函数 | asFlow 构建器函数 )

Kotlin 协程Flow 异步流 ④ ( 流的构建器函数 | flow 构建器函数 | flowOf 构建器函数 | asFlow 构建器函数 )

Kotlin 协程Flow 异步流 ② ( 使用 Flow 异步流持续获取不同返回值 | Flow 异步流获取返回值方式与其它方式对比 | 在 Android 中使用 Flow 异步流下载文件 )

Kotlin 协程Flow 异步流 ② ( 使用 Flow 异步流持续获取不同返回值 | Flow 异步流获取返回值方式与其它方式对比 | 在 Android 中使用 Flow 异步流下载文件 )