Kotlin Flow响应式编程,操作符函数进阶

Posted guolin

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kotlin Flow响应式编程,操作符函数进阶相关的知识,希望对你有一定的参考价值。

本文同步发表于我的微信公众号,扫一扫文章底部的二维码或在微信搜索 郭霖 即可关注,每个工作日都有文章更新。

大家好,今天原创。

在上一篇原创文章当中,我跟大家说了会开启一个新的系列,讲一讲Kotlin Flow响应式编程从入门到进阶的内容。

总共计划是用三篇文章讲完,而本篇则是这个系列的第二篇文章。如果你还没有看过前面的基础知识入门的话,可以先去参考这里 Kotlin Flow响应式编程,基础知识入门

本篇文章我打算着重讲解一下操作符函数的相关内容。什么是操作符函数?如果你熟悉RxJava,那么对于操作符函数一定不会陌生。如果你不熟悉RxJava,那么操作符函数就是那个让RxJava如此难学的元凶。

准确来说,RxJava的操作符函数不是难,而是多。之前Google在刚推出自家LiveData时也曾调侃过,我们的操作符函数只有两个,可不像某些库有上百万个那么多。

我相信应该没有任何一个人能够熟练掌握RxJava的所有操作符函数,这一点越是RxJava的老手应该越是深有体会。正确的做法是,只去熟记那些最常用的操作符函数即可,剩下绝大多数的操作符函数都是不太能用得到的,或者需要用到时再去查阅文档学习即可。

那么Kotlin Flow的操作符函数也是类似的,虽然它没有RxJava那么多,但是着实也不少。本篇文章我会尽可能将最常用的操作符函数全部覆盖到,那么学完本篇文章之后,在绝大部分Kotlin Flow的操作符函数使用场景中,你应该都可以比较得心应手了。

另外,我对接下来即将介绍的所有操作符函数按照功能相似度以及难易程度进行了分组,会按照先易后难的原则一一进行介绍。

话不多说,开始走起。


0. setup

我们会以全程实践的方式来学习文中所有的操作符函数。因此,将实践环境提前搭建好是必不可少的。

首先创建一个android项目,并在项目中添加如下依赖库:

dependencies 
    ...
    implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.6.1"
    implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:1.6.1"

由于Flow以及操作符函数并不依赖于Android项目,因此我们并不必非得在Android项目中来进行实践。简单起见,我们只需要在一个Kotlin类当中来实践本篇文章的内容即可。

创建一个Flow.kt类,并在其中定义一个main()函数,如下所示:

fun main() 
    runBlocking 
        
    

这里我们在main()函数中添加了一个runBlocking代码块,它的作用是提供协程作用域给稍后的Flow使用,并且在代码块中的所有代码执行完之前阻塞当前线程。

因此,我们把后续的所有例子都放到runBlocking代码块中去执行就可以了。

另外当你定义好了main()函数之后,你会发现它的左边会出现一个运行箭头:

只要点击这个箭头就可以执行main()函数中的代码了。

好了,实践环境到这里已经搭建完成,接下来正式开始学习操作符函数吧。


1. map

刚才我们有说会按照先易后难的原则进行学习,那么毫无疑问,map一定是最容易的操作符函数了。

在很多编程语言里面都有内置的map函数,甚至Kotlin自己就有。RxJava中也有map这个操作符函数,所以我们在Flow中第一个介绍它简直就是理所应当的事情。

那么顾名思义,map就是用于将一个值映射成另一个值,具体映射的规则我们则可以在map函数中自行定义。

这里我通过一个简单的例子就能带大家快速理解map函数,这种简单的操作符函数没必要花费太多时间。

fun main() 
    runBlocking 
        val flow = flowOf(1, 2, 3, 4, 5)
        flow.map 
            it * it
        .collect 
            println(it)
        
    

首先,我们通过flowOf函数构造了一个flow对象,里面依次发送了1, 2, 3, 4, 5这几个值。

那么如果直接对这个flow去collect,我们理所应当打印出来的也是1, 2, 3, 4, 5这几个值。

但是这里在collect之前,我们调用了map操作符函数,并在里面做了一下平方运算。因此collect之后的结果就会变成1, 4, 9, 16, 25。

验证一下,打印结果如下图所示:

这就是map操作符函数,非常简单,相信你一下子就已经理解了。


2. filter

filter也是一个非常简单的操作符函数。顾名思义,它是用来过滤掉一些数据的。

Flow当中的操作符函数既可以单独使用,也可以结合其他操作符函数一起使用。

这里我们通过结合filter和map这两个操作符函数,来快速演示一下用法,你就立刻能掌握了。

fun main() 
    runBlocking 
        val flow = flowOf(1, 2, 3, 4, 5)
        flow.filter  
            it % 2 == 0
        .map 
            it * it
        .collect 
            println(it)
        
    

这里在filter函数中指定了一个条件,判断数据是否是偶数。

因而,flow中的数据只有是偶数的情况下才会继续向下传递,奇数则会被filter函数过滤掉。

验证一下结果,如下图所示:

非常好理解,现在filter操作符函数你也已经掌握了。


3. onEach

onEach又是一个非常简单的操作符函数,它甚至比map和filter还要简单直白,因为它就是用来遍历每一条数据的。

比如说我们想把flow中的每一条数据打印出来,借助onEach函数就可以这样写:

fun main() 
    runBlocking 
        val flow = flowOf(1, 2, 3, 4, 5)
        flow.onEach 
            println(it)
        .collect 
        
    

可能有的朋友会说,这不是脱了裤子放屁嘛,我在collect函数中就可以把数据打印出来了,还需要借助onEach函数干什么。

确实,但是collect函数中打印出的是最终的结果。如果你想要查看某个中间状态时flow的数据状态,借助onEach就非常有用了。

fun main() 
    runBlocking 
        val flow = flowOf(1, 2, 3, 4, 5)
        flow.filter 
            it % 2 == 0
        .onEach 
            println(it)
        .map 
            it * it
        .collect 
        
    

可以看到,这里我们将onEach函数插入到了filter函数和map函数之间。那么现在onEach所处的就是一个经过了偶数过滤,但是还没有经过乘积运算的一个状态。

验证一下结果,如下图所示:

没有问题,结果正如我们想象的那样,所有偶数都被打印出来了。


4. debounce

最简单的操作符函数差不多就这些,接下来要学习稍微有些难度的操作符函数了。

debounce函数可以用来确保flow的各项数据之间存在一定的时间间隔,如果是时间点过于临近的数据只会保留最后一条。

这个函数在某些特定场景下特别有用。

想象一下我们正在Edge浏览器的地址栏中输入搜索关键字,浏览器的地址栏下方通常都会给出一些搜索建议。

这些搜索建议是根据用户当前输入的内容通过发送网络请求去服务器端实时获取的。

但如果用户每输入一个字符就立刻发起一条网络请求,这是非常不合理的设计。

原因很简单,网络请求是不可能做到无延时响应的,而用户的打字速度通常都比较快。如果用户每输入一个字符都立刻发起一条网络请求,那么很有可能用户输完了第3个字符之后,对应第1个字符的网络请求结果才刚刚返回回来,而此时的数据早已是无效数据了。

正确的做法是,我们不应该在用户每输入一个字符时就立刻发起一条网络请求,而是应该在一定时间的停顿之后再发起网络请求。如果用户停顿了一段时间,很有可能说明用户已经结束了快速输入,开始期待一些搜索建议了。这样就能有效地避免发起无效网络请求的情况。

而要实现这种功能,使用debounce操作符函数就会非常简单,它就是为了这种场景而设计的。

还是通过代码来举例演示:

fun main() 
    runBlocking 
        flow 
            emit(1)
            emit(2)
            delay(600)
            emit(3)
            delay(100)
            emit(4)
            delay(100)
            emit(5)
        
        .debounce(500)
        .collect 
            println(it)
        
    

可以看到,我们调用了debounce函数,并且传入了500作为参数,意义就是说只有两条数据之间的间隔超过500毫秒才能发送成功。

这里使用emit()函数依次发送了1、2、3、4、5这几条数据。其中,1和2是连续发送的,2和3之间存在600毫秒的间隔,因此2可以发送成功。3和4之间、4和5之间间隔只有100毫秒,因此都无法发送成功。5由于是最后一条数据,因此可以发送成功。

那么打印结果应该是2和5:

这就是debounce操作符函数的用法了。


5. sample

sample操作符函数和debounce稍微有点类似,它们的用法也比较接近,同样都是接收一个时间参数。

sample是采样的意思,也就是说,它可以从flow的数据流当中按照一定的时间间隔来采样某一条数据。

这个函数在某些源数据量很大,但我们又只需展示少量数据的时候比较有用。

比如说视频网站的弹幕功能,理论上来说每个时间点用户发送的弹幕数量可以是无限多的,但是视频网站又不可能把每条弹幕都展示出来,不然的话弹幕和视频都没法看了。

因此,这个时候就需要对数据进行采样。

我们来模拟一下这种场景,假设某个视频的播放量很大,每时每刻都有无数人在发送弹幕,但是我们每秒钟最多只允许显示1条弹幕,代码就可以这样写:

fun main() 
    runBlocking 
        flow 
            while (true) 
                emit("发送一条弹幕")
            
        
        .sample(1000)
        .flowOn(Dispatchers.IO)
        .collect 
            println(it)
        
    

这里我们在flow构建函数中写了一个死循环,不断地在发送弹幕,那么这个弹幕的发送量无疑是巨大的。

而接下来我们借助sample函数进行数据采集,每秒钟只取一条弹幕,这样就轻松满足了前面说的弹幕采样率的要求。

另外还有一点需要注意,由于flow是通过死循环不断发送的,我们必须调用flowOn函数,让它在IO线程里去执行,否则我们的主线程就一直被卡死了。

接下来运行一下程序来看看结果吧:

可以看到,虽然弹幕的发送量无限大,但是我们每秒钟只会打印出一条弹幕,这就是sample操作符函数的用法了。


6. reduce

刚才学习的几个操作符函数最终都还是要通过collect函数来收集结果的。

接下来我们学习两个不需要借助collect函数,自己就能终结整个flow流程的操作符函数,这种操作符函数也被称为终端操作符函数。

首先来看reduce函数。我个人认为reduce函数还是比较好理解的,它的基本公式如下:

flow.reduce  acc, value -> acc + value 

其中acc是累积值的意思,value则是当前值的意思。

也就是说,reduce函数会通过参数给我们一个Flow的累积值和一个Flow的当前值,我们可以在函数体中对它们进行一定的运算,运算的结果会作为下一个累积值继续传递到reduce函数当中。

举一个更加具体点的例子,我们上学时学等差数列都会讲这个故事,高斯的老师让全班同学计算从1加到100的结果。

今天我们不需要借助等差数列,只需要借助reduce函数就可以立刻算出结果了:

fun main() 
    runBlocking 
        val result = flow 
            for (i in (1..100)) 
                emit(i)
            
        .reduce  acc, value -> acc + value
        println(result)
    

这里需要注意的是,reduce函数是一个终端操作符函数,它的后面不可以再接其他操作符函数了,而是只能获取最终的运行结果。

那么运算结果毫无疑问是5050:



7. fold

fold函数和reduce函数基本上是完全类似的,它也是一个终端操作符函数。

主要的区别在于,fold函数需要传入一个初始值,这个初始值会作为首个累积值被传递到fold的函数体当中,它的基本公式如下:

flow.fold(initial)  acc, value -> acc + value 

总体区别就这么多,所以我感觉fold函数并没有什么好讲的,它和reduce函数具体用谁只取决于你编程时的业务逻辑需求。

但是其实reduce函数和fold函数并不是只能用作数值计算,相反它们可以作用于任何类型的数据。因此,这里我就用fold函数来演示一个字符串拼接的功能吧:

fun main() 
    runBlocking 
        val result = flow 
            for (i in ('A'..'Z')) 
                emit(i.toString())
            
        .fold("Alphabet: ")  acc, value -> acc + value
        println(result)
    

这里我们将字母A-Z进行了拼接,另外fold函数要求传入一个初始值,那么我们就再添加一个Alphabet的头部,打印结果如下所示:


8. flatMapConcat

操作符函数到了flatMap系列,难度就开始骤升了。

其实为了搞明白这几个flatMap操作符函数的用法,我也去参考了不少资料,但是基本上没有哪个能够讲得非常简单易懂的,因此我自己也学得很吃力。

但是没办法,本来它们就是有一定难度的操作符函数,再好的文笔也不可能将这些难度全部抹平了。

那么这里我就按照我的方式,用尽可能简单的讲解和代码演示来向大家介绍flatMap系列操作符函数的用法。

这里我一直在说系列,是因为目前一共有3种以flatMap开头的操作符函数,分别是flatMapConcat、flatMapMerge和flatMapLatest。

这3种操作符函数我们都会介绍,先从flatMapConcat开始。

前面我们所学的所有内容都是在一个flow上进行操作,而从flatMap开始,要上升到对两个flow进行操作了。

flatMap的核心,就是将两个flow中的数据进行映射、合并、压平成一个flow,最后再进行输出。

举例讲解可能会更加容易理解一些,观察如下代码:

fun main() 
    runBlocking 
        flowOf(1, 2, 3)
            .flatMapConcat 
                flowOf("a$it", "b$it")
            
            .collect 
                println(it)
            
    

这里的第一个flow会依次发送1、2、3这几个数据。

然后在flatMapConcat函数中,我们传入了第二个flow。

第二个flow会依次发送a、b这两个数据,但是在a、b的后面又各自拼接了一个it。

这个it就是来自第一个flow中的数据。所以,flow1中的1、2、3会依次与flow2中的a、b进行组合,这样就能组合出a1、b1、a2、b2、a3、b3这样几条数据。

而collect函数最终收集到的就是这些组合后的数据。

验证一下,打印结果如下所示:

这样我们就弄明白示例中flatMapConcat函数的用法了,但是在实际的业务中,这个函数又有什么具体的应用场景呢?

不知道你有没有遇到过这样的情况,请求一个网络资源时需要依赖于先去请求另外一个网络资源。

比如说我们想要获取用户的数据,但是获取用户数据必须要有token授权信息才行,因此我们得先发起一个请求去获取token信息,然后再发起另一个请求去获取用户数据。

这种两个网络请求之间存在依赖关系的代码其实挺不好写的,稍微一不注意就可能会陷入嵌套地狱:

public void getUserInfo() 
    sendGetTokenRequest(new Callback() 
        @Override
        public void result(String token) 
            sendGetUserInfoRequest(token, new Callback() 
                @Override
                public void result(String userInfo) 
                    // handle with userInfo
                
            );
        
    );

可以看出来,网终请求代码由于需要开线程执行,然后在回调中获取结果,通常会嵌套得比较深。

而这个问题我们就可以借助flatMapConcat函数来解决。

假设我们将sendGetTokenRequest()函数和sendGetUserInfoRequest()函数都使用flow的写法进行改造:

fun sendGetTokenRequest(): Flow<String> = flow 
    // send request to get token
    emit(token)


fun sendGetUserInfoRequest(token: String): Flow<String> = flow 
    // send request with token to get user info
    emit(userInfo)

那么接下来就可以用flatMapConcat函数将它们串连成一条链式执行的任务了:

fun main() 
    runBlocking 
        sendGetTokenRequest()
            .flatMapConcat  token ->
                sendGetUserInfoRequest(token)
            
            .flowOn(Dispatchers.IO)
            .collect  userInfo ->
                println(userInfo)
            
    

当然,这个用法并不仅限于只能将两个flow串连成一条链式任务,如果你有更多的任务需要串到这同一条链上,只需要不断连缀flatMapConcat即可:

fun main() 
    runBlocking 
        flow1.flatMapConcat  flow2 
             .flatMapConcat  flow3 
             .flatMapConcat  flow4 
             .collect  userInfo ->
                 println(userInfo)
             
    

可以看到,这种写法,不管串连多少任务,都可以用完全平级的写法搞定,完全不会遇到之前嵌套地狱的困扰。

不知道我这样讲解flatMapConcat函数,是不是已经比较清楚了?


9. flatMapMerge

理解了flatMapConcat函数,再来看flatMapMerge函数会比较容易一些。

很多人觉得flatMap这几个操作符函数难以理解,其中一个原因就是,不管代码怎么写,flatMapConcat和flatMapMerge的效果好像都是一样的。

没错,如果只是用我们上面学习的代码示例,你会发现不管是用flatMapConcat还是flatMapMerge,最终的结果都是相同的。

这两个函数最主要的区别其实就在字面上。concat是连接的意思,merge是合并的意思。连接一定会保证数据是按照原有的顺序连接起来的,而合并则只保证将数据合并到一起,并不会保证顺序。

因此,flatMapMerge函数的内部是启用并发来进行数据处理的,它不会保证最终结果的顺序。

当然,刚才我们所使用的示例并不能演示出这种场景,下面我来对代码稍微进行一下改造:

fun main() 
    runBlocking 
        flowOf(300, 200, 100)
            .flatMapConcat 
                flow 
                    delay(it.toLong())
                    emit("a$it")
                    emit("b$it")
                
            
            .collect 
                println(it)
            
    

变化主要在于,我将第一个flow发送的数据改成了300、200、100。然后第二个flow中,在发送数据之前,我们要先去delay相对应的毫秒数。

现在运行的结果你觉得会是什么样子呢?

我们来看看吧:

可以看到,最终的结果仍然是按照flow1中数据发送的顺序输出的,即使第一个数据被delay了300毫秒,后面的数据也没有优先执行权。这就是flatMapConcat函数所代表的涵义。

而到了这里,flatMapMerge函数的区别也就呼之欲出了,它是可以并发着去处理数据的,而并不保证顺序。那么哪条数据被delay的时间更短,它就可以更优先地得到处理。

将flatMapConcat函数替换成flatMapMerge函数,如下所示:

fun main() 
    runBlocking 
        flowOf(300, 200, 100)
            .flatMapMerge 
                flow 
                    delay(it.toLong())
                    emit("a$it")
                    emit("b$it")
                
            
            .collect 
                println(it)
            
    

现在重新运行一下程序:

这两个操作符函数的区别,相信你已经掌握了吧?


10. flatMapLatest

终于到了flatMap系列的最后一个操作符函数。

掌握了前面两个flatMap函数,再来看flatMapLatest函数就不会觉得很难了。

它的作用和其他两个flatMap函数都是类似的,也是把两个flow合并、压平成一个flow。它的行为,和我们在 Kotlin Flow响应式编程,基础知识入门 这篇文章中学到的collectLatest函数是比较接近的。

因此我们把这几个知识点稍微融合一下就能理解flatMapLatest函数了。

先来回顾一下collectLatest函数的特性,它只接收处理最新的数据。如果有新数据到来了而前一个数据还没有处理完,则会将前一个数据剩余的处理逻辑全部取消。

flatMapLatest函数也是类似的,flow1中的数据传递到flow2中会立刻进行处理,但如果flow1中的下一个数据要发送了,而flow2中上一个数据还没处理完,则会直接将剩余逻辑取消掉,开始处理最新的数据。

我们还是通过一段代码来演示一下吧:

fun main() 
    runBlocking 
        flow 
            emit(1)
            delay(150)
            emit(2)
            delay(50)
            emit(3)
        .flatMapLatest 
            flow 
                delay(100)
                emit("$it")
            
        
        .collect 
            println(it)
        
    

这里我们在flow1中依次发送了1、2、3这几条数据。其中,1和2之间间隔了150毫秒,2和3之间间隔了50毫秒。

而在flow2中,每次处理数据需要消耗100毫秒。

那么由此我们可以分析出,当flow1中的第2条数据发送过来时,flow2中的第1条数据肯定已经处理完了。但是当flow1中的第3条数据发送过来时,flow2中的第2条数据并没有处理完。那么根据collectLatest函数的规则,这条数据的剩余处理逻辑会被取消掉。因此,2不会被打印出来。

最终我们看到的打印结果应该是1和3:

flatMap操作符函数系列到此结束。


11. zip

和flatMap函数有点类似,zip函数也是作用在两个flow上的。不过,它们的适用场景完全不同。

使用zip连接的两个flow,它们之间是并行的运行关系。这点和flatMap差别很大,因为flatMap的运行方式是一个flow中的数据流向另外一个flow,是串行的关系。

我们先来看一下zip函数的基本用法:

fun main() 
    runBlocking 
        val flow1 = flowOf("a", "b", "c")
        val flow2 = flowOf(1, 2, 3, 4, 5)
        flow1.zip(flow2)  a, b ->
            a + b
        .collect 
            println(it)
        
    

这里使用zip函数连接了两个flow,并且在zip的函数体中将两个flow中的数据进行了拼接。

但是需要注意的是,这两个flow中的数据量并不相同,第一个flow中有3个数据,第二个flow中有5个数据。

那么zip函数的规则是,只要其中一个flow中的数据全部处理结束就会终止运行,剩余未处理的数据将不会得到处理。因此,flow2中的4和5这两个数据会被舍弃掉。

运行一下程序,结果如下图所示:

但是我们又如何证明flow1和flow2之间是并行运行的呢?

这里我稍微对代码进行一下改造,把运行时间打印出来就知道了:

fun main() 
    runBlocking 
        val start = System.currentTimeMillis()
        val flow1 = flow 
            delay(3000)
            emit("a")
        
        val flow2 = flow 
            delay(2000)
            emit(1)
        
        flow1.zip(flow2)  a, b ->
            a + b
        .collect 
            val end = System.currentTimeMillis()
            println("Time cost: $end - startms")
        
    

可以看到,我们在flow1中delay了3秒钟,flow2中delay了2秒钟。

如果它们之间是串行关系的话,那么最终的总耗时一定是5秒以上。

现在重新运行一下程序,看一看结果如何:

结果是3036毫秒。由此可以证明flow1和flow2之间是并行的关系,最终的总耗时取决于运行耗时更久的那个flow。

那么zip函数具体又有什么应用场景呢?

想象一下如下需求,我们正在开发一个天气预报应用,需要去一个接口请求当前实时的天气信息,还需要去另一个接口请求未来7天的天气信息。

这两个接口之间并没有先后依赖关系,但是却需要两个接口同时返回数据之后再将天气信息展示给用户。

如果我们先去请求当前实时的天气信息,等得到数据响应之后再去请求未来7天的天气信息,那效率就会比较低了,因为这两件事情很明显可以同时去做。

而zip函数就刚好完美贴合这种应用场景,使用zip函数模拟上述场景的代码示例如下:

fun sendRealtimeWeatherRequest(): Flow<String> = flow 
    // send request to realtime weather
    emit(realtimeWeather)


fun sendSevenDaysWeatherRequest(): Flow<String> = flow 
    // send request to get 7 days weather
    emit(sevenDaysWeather)


fun main() 以上是关于Kotlin Flow响应式编程,操作符函数进阶的主要内容,如果未能解决你的问题,请参考以下文章

Kotlin Flow响应式编程,操作符函数进阶

Kotlin 协程Flow 异步流 ⑥ ( 调用 Flow#launchIn 函数指定流收集协程 | 通过取消流收集所在的协程取消流 )

Kotlin 协程Flow 异步流 ⑥ ( 调用 Flow#launchIn 函数指定流收集协程 | 通过取消流收集所在的协程取消流 )

Kotlin Flow响应式编程,StateFlow和SharedFlow

Kotlin Flow响应式编程,StateFlow和SharedFlow

Kotlin Flow响应式编程,基础知识入门