Kotlin 协程Flow 流组合 ( Flow#zip 组合多个流 | 新组合流的元素收集间隔与被组合流元素发射间隔的联系 )

Posted 韩曙亮

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kotlin 协程Flow 流组合 ( Flow#zip 组合多个流 | 新组合流的元素收集间隔与被组合流元素发射间隔的联系 )相关的知识,希望对你有一定的参考价值。

文章目录





一、Flow 流组合




1、Flow#zip 组合多个流


调用 Flow#zip 函数 , 可以将两个 Flow 流合并为一个流 ;


Flow#zip 函数原型 :

/**
 * 将来自当前流(' this ')的值压缩到[其他]流,使用提供的[transform]函数应用到每对值。
 * 在剩下的流上调用一个流完成和取消时,生成的流就会完成。
 *
 * 可以用下面的例子来演示:
 * ```
 * val flow = flowOf(1, 2, 3).onEach  delay(10) 
 * val flow2 = flowOf("a", "b", "c", "d").onEach  delay(15) 
 * flow.zip(flow2)  i, s -> i.toString() + s .collect 
 *     println(it) // Will print "1a 2b 3c"
 * 
 * ```
 *
 * ### 缓冲
 *
 * 上游流在同一协程中按顺序收集,而不进行任何缓冲
 * [other]流被并发收集,就像使用' buffer(0) '一样。参见[buffer]操作符中的文档
 * 为解释。您可以根据需要使用对[buffer]操作符的额外调用,以获得更多并发性。
 */
public fun <T1, T2, R> Flow<T1>.zip(other: Flow<T2>, transform: suspend (T1, T2) -> R): Flow<R> = zipImpl(this, other, transform)

代码示例 :

package kim.hsl.coroutine

import android.os.Bundle
import androidx.appcompat.app.AppCompatActivity
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

class MainActivity : AppCompatActivity() 
    override fun onCreate(savedInstanceState: Bundle?) 
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        runBlocking 
            val numFlow = (1..3).asFlow()
            val strFlow = flowOf("One", "Two", "Three")
            // 合并两个 Flow 流
            numFlow.zip(strFlow)  num, str ->
                "num = $num, str = $str"
            .collect 
                println(it)
            
        
    

执行结果 :

2022-12-26 16:39:29.428 30002-30002/kim.hsl.coroutine I/System.out: num = 1, str = One
2022-12-26 16:39:29.429 30002-30002/kim.hsl.coroutine I/System.out: num = 2, str = Two
2022-12-26 16:39:29.433 30002-30002/kim.hsl.coroutine I/System.out: num = 3, str = Three


2、新组合流的元素收集间隔与被组合流元素发射间隔的联系


假如两个 Flow 流的 元素发射 不同步 , 则 先发射的元素 , 需要等待对应顺序的 后发射的元素到来 ;

在下面的代码中 , numFlow 的发射元素间隔为 100ms , strFlow 发射元素间隔为 1000ms , 则 numFlow 元素收集到之后 , 需要等待 strFlow 元素收集 , 也就是 二者合并后的间隔以 慢的为准 , 合并后的流 发射间隔为 1000ms ;


代码示例 :

package kim.hsl.coroutine

import android.os.Bundle
import androidx.appcompat.app.AppCompatActivity
import kotlinx.coroutines.*
import kotlinx.coroutines.flow.*

class MainActivity : AppCompatActivity() 
    override fun onCreate(savedInstanceState: Bundle?) 
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        runBlocking 
            val numFlow = (1..3).asFlow().onEach  delay(100) 
            val strFlow = flowOf("One", "Two", "Three").onEach  delay(1000) 
            // 合并两个 Flow 流
            numFlow.zip(strFlow)  num, str ->
                "num = $num, str = $str"
            .collect 
                println(it)
            
        
    

执行结果 :

2022-12-26 16:48:43.931 31838-31838/kim.hsl.coroutine I/System.out: num = 1, str = One
2022-12-26 16:48:44.949 31838-31838/kim.hsl.coroutine I/System.out: num = 2, str = Two
2022-12-26 16:48:45.956 31838-31838/kim.hsl.coroutine I/System.out: num = 3, str = Three

以上是关于Kotlin 协程Flow 流组合 ( Flow#zip 组合多个流 | 新组合流的元素收集间隔与被组合流元素发射间隔的联系 )的主要内容,如果未能解决你的问题,请参考以下文章

Kotlin 协程Flow 异步流 ⑥ ( 调用 Flow#launchIn 函数指定流收集协程 | 通过取消流收集所在的协程取消流 )

Kotlin 协程Flow 异步流 ⑥ ( 调用 Flow#launchIn 函数指定流收集协程 | 通过取消流收集所在的协程取消流 )

Kotlin 协程Flow 异步流 ④ ( 流的构建器函数 | flow 构建器函数 | flowOf 构建器函数 | asFlow 构建器函数 )

Kotlin 协程Flow 异步流 ④ ( 流的构建器函数 | flow 构建器函数 | flowOf 构建器函数 | asFlow 构建器函数 )

Kotlin 协程Flow 异步流 ② ( 使用 Flow 异步流持续获取不同返回值 | Flow 异步流获取返回值方式与其它方式对比 | 在 Android 中使用 Flow 异步流下载文件 )

Kotlin 协程Flow 异步流 ② ( 使用 Flow 异步流持续获取不同返回值 | Flow 异步流获取返回值方式与其它方式对比 | 在 Android 中使用 Flow 异步流下载文件 )