深潜Kotlin协程(二十二):Flow的处理

Posted RikkaTheWorld

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了深潜Kotlin协程(二十二):Flow的处理相关的知识,希望对你有一定的参考价值。

系列电子书:传送门


我们将 Flow 描述为一个管道,值在上面流动,流动时,这些值可以以不同的方式进行更改:删除、相乘、转换或合并。这些在 Flow 被创建到终端操作之间的所有操作称为Flow的处理。在本章中,我们将学习用于此目的的函数。

这里提供的函数可能会让你想起用于处理集合的函数,这并不是巧合,因为都代表了相同的概念。不同的是 flow 上的元素可以按时传递。

map

我们需要学习的第一个重要函数就是 map。它根据转换函数对流动的每个元素进行变换。假如你有一个整型数据流,你的 map 操作是计算这些数字的平方,那么最终流上的数据会是这些数字的平方。

suspend fun main() 
    flowOf(1, 2, 3) // [1, 2, 3]
        .map  it * it  // [1, 4, 9]
        .collect  print(it)  // 149

之后,我将使用上面所示的图片来可视化 flow 处理函数如何随时间改变元素。水平线表示时间,这条线上的元素是某个时间点该 flow 中被发射的元素。上面的一行表示操作之前的数据流,下面一行表示操作之后的数据流。这个图还可以用来表示使用多个操作,比如下图中的 mapfilter

大多数 Flow 处理函数,都很容易通过我们前面章节学习的工具来实现。要实现 map,我们可以使用 flow 构建器来创建一个新的 flow。然后,我们可以从前一个 flow 中收集元素,并发射出经过转换的元素。下面的实现是来自于 kotlin.coroutines 库中 map 的一个简化版本:

fun <T, R> Flow<T>.map(
    transform: suspend (value: T) -> R
): Flow<R> = flow  // 这里我们创建了一个 flow
    collect  value -> // 这里我们从接收者哪里收集数据
        emit(transform(value))
    

map 是一个非常受欢迎的函数。它的用法有解包或者将值转换成不同的类型。

// 这里我们使用 map 来从输入事件中获得用户操作
fun actionsFlow(): Flow<UserAction> =
    observeInputEvents()
        .map  toAction(it.code) 

// 这里我们把 User 转化成 UserJson
fun getAllUser(): Flow<UserJson> =
    userRepository.getAllUsers()
        .map  it.toUserJson() 

filter

下一个重要的函数是 filter,它返回一个仅包含原始 flow 中匹配了给定谓词的数据的 flow。

suspend fun main() 
    (1..10).asFlow() // [1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
        .filter  it <= 5  // [1, 2, 3, 4, 5]
        .filter  isEven(it)  // [2, 4]
        .collect  print(it)  // 24


fun isEven(num: Int): Boolean = num % 2 == 0

使用 flow 构建器也可以非常容易地实现此函数。我们只需要引入一个 if 语句和谓词(替代了转换)。

fun <T> Flow<T>.filter(
    predicate: suspend (T) -> Boolean
): Flow<T> = flow  // 这里我们创建一个流
    collect  value -> // 这里我们从接收者那里接收元素
        if (predicate(value)) 
            emit(value)
        
    

filter 通常用于排除我们不需要的元素。

// 这里我们使用 filter 去过滤掉我们不想要的元素
fun actionsFlow(): Flow<UserAction> =
    observeInputEvents()
        .filter  isValidAction(it.code) 
        .map  toAction(it.code) 

take 和 drop

我们使用 take 来传递一定数量的元素。

suspend fun main() 
    ('A'..'Z').asFlow()
        .take(5) // [A, B, C, D, E]
        .collect  print(it)  // ABCDE

我们使用 drop 来忽略特定数量的元素。

suspend fun main() 
    ('A'..'Z').asFlow()
        .drop(20) // [U, V, W, X, Y, Z]
        .collect  print(it)  // UVWXYZ

它们低层是如何工作的?

我们已经看到了相当多的 flow 处理和生命周期函数。它们的实现非常简单,没有什么神奇的东西在里面。大多数这样的函数都可以用 flow 构建器,并用 collect 来实现。下面是一个简单的流处理示例,包括简化的 mapflowOf 实现:

suspend fun main() 
    flowOf('a', 'b')
        .map  it.uppercase() 
        .collect  print(it)  // AB


fun <T, R> Flow<T>.map(
    transform: suspend (value: T) -> R
): Flow<R> = flow 
    collect  value ->
        emit(transform(value))
    


fun <T> flowOf(vararg elements: T): Flow<T> = flow 
    for (element in elements) 
        emit(element)
    

如果内联 flowOfmap 函数,你将得到下面代码(我在 lambdas 上添加了标签和带有数字的注释)。

suspend fun main() 
    flow map@ // 1
        flow flowOf@ // 2
            for (element in arrayOf('a', 'b'))  // 3
                this@flowOf.emit(element) // 4
            
        .collect  value -> // 5
            this@map.emit(value.uppercase()) // 6
        
    .collect  // 7
        print(it) // 8
    

让我们一步一步的分析。

  1. 我们在注释1处启动一个 flow,并在注释7处调用了其 collect 函数
  2. 当我们启动收集时,我们会调用 @map(注释1)的 lambda 表达式,它在注释2处启动了另一个 flow 构建器,并在注释5处去收集。
  3. 所以此时,我们我们启动 @flowOn(注释2处)的 lambda 表达式,它迭代了包含 ‘a’ 和 ‘b’ 的数组。
  4. 第一个值 ‘a’ 在注释4被发射,发射到了注释5处,该 lambda 表达式将值转换成 ‘A’ ,并从 @map 的 collect 函数中发射出去,从而调用到 7 处的 lambda 表达式, 该值被打印了出来。
  5. 注释7结束了,会在注释6处恢复,因为注释6没有别的内容,所以它也结束了,所以我们在注释4处恢复了 @flowOf
  6. 我们继续迭代并在注释4处发射 ‘b’ ,因此我们调用注释5处的 lambda 函数,将值 转成成 ‘B’,并从 @mapcollect 函数中发射出去。该值就会在注释7处被收集,然后在注释8处被打印。
  7. 接着注释7结束,恢复到注释6处,也完成了。所以我们在注释4处恢复 @flowOf 。它也完成了。由于没有更多的内容,我们到达了 @map 的结尾,这样,我们在注释7恢复,并到达了 main 函数的结尾。

在大多数 flow 处理和生命周期函数中也会是相同的情况,所以理解这一点可以让我们更好的理解 flow 是如何运作的。

merge, zip 和 combine

接下来学习将两个流合并成一个流。有几种方法可以做到这一点。最简单的方法是将两个流中的元素合并成一个。无论 flow 的元素源自哪里,都不用做任何修改,为此,我们可以使用 merge 函数。

suspend fun main() 
    val ints: Flow<Int> = flowOf(1, 2, 3)
    val doubles: Flow<Double> = flowOf(0.1, 0.2, 0.3)
    
    val together: Flow<Number> = merge(ints, doubles)
    print(together.toList())
    // [1, 0.1, 0.2, 0.3, 2, 3]
    // 或者 [1, 0.1, 0.2, 0.3, 2, 3]
    // 或者 [0.1, 1, 2, 3, 0.2, 0.3]
    // 或者其他的组合

重要的点是,当我们使用 merge 时,一个流的元素不需要等待另一个流上的元素。例如,在下面的例子中,来自第一个 flow 的元素被延迟,但这并不会影响来自第二个 flow 的元素。

suspend fun main() 
    val ints: Flow<Int> = flowOf(1, 2, 3)
        .onEach  delay(1000) 
    val doubles: Flow<Double> = flowOf(0.1, 0.2, 0.3)

    val together: Flow<Number> = merge(ints, doubles)
    together.collect  println(it) 

// 0.1
// 0.2
// 0.3
// (1 sec)
// 1
// (1 sec)
// 2
// (1 sec)
// 3

当有多个事件源都有相同的操作时,我们可以使用 merge

fun listenForMessages() 
    merge(userSentMessages, messagesNotifications)
        .onEach  displayMessage(it) 
        .launchIn(scope)

下一个函数是 zip,它对两个 flow 进行配对。我们需要定义一个函数来决定元素是如何配对的(配对后结果将在发射到新的 flow 中)。每个元素只能是一对一的配对,所以它需要等待和它配对的那一个。没有配对的元素将会丢失,因此,当一个 flow 的压缩完成时,新的 flow 也将完成(其它 flow 亦是如此)。

suspend fun main() 
    val flow1 = flowOf("A", "B", "C")
        .onEach  delay(400) 
    val flow2 = flowOf(1, 2, 3, 4)
        .onEach  delay(1000) 
    flow1.zip(flow2)  f1, f2 -> "$f1_$f2" 
        .collect  println(it) 

// (1 sec)
// A_1
// (1 sec)
// B_2
// (1 sec)
// C_3

zip 函数让我想起了波兰的传统舞蹈 —— 波罗涅滋舞。这种舞蹈的一个特点是,一排成对的人从中间分开,然后他们再次相遇后会重新组合。

组合两个 flow 的最后一个重要函数是 combine。就像 zip 一样,它也是从元素中形成对,快的 flow 生成的元素必须等待较慢的那个 flow 生成的元素来产生第一对。然而,与波兰舞的相似之处到此为止。使用 combine 时,每个新元素都会替换它的前一个元素。如果第一对已经形成,一个新的元素将和来自另一个 flow 的前一个元素一起产生新的对。

注意, zip 需要配对,所以当第一个 flow 关闭时,zip 就关闭了。 而 combine 没有这样的限制,直到两个 flow 都关闭前,它将一直发射数据。

suspend fun main() 
    val flow1 = flowOf("A", "B", "C")
        .onEach  delay(400) 
    val flow2 = flowOf(1, 2, 3, 4)
        .onEach  delay(1000) 
    flow1.combine(flow2)  f1, f2 -> "$f1_$f2" 
        .collect  println(it) 

// (1 sec)
// B_1
// (0.2 sec)
// C_1
// (0.8 sec)
// C_2
// (1 sec)
// C_3
// (1 sec)
// C_4

当我们需要观察两个来源的变化时,通常会使用 combine。我们还可以向每个组合的 flow 添加初始值(以获得初始对)。

userUpdateFlow.onStart  emit(currentUser) 

一个典型的场景是当一个视图有两个数据源。例如,当一个通知同时依赖用户的当前状态和一些通知时,我们可以同时观察它们并结合它们的更改来更新视图。

userStateFlow
    .combine(notificationsFlow)  userState, notifications ->
        updateNotificationBadge(userState, notifications)
    
    .collect()

fold 和 scan

如果你使用过集合处理函数,你可能会了解 fold(折叠)。通过对每个元素(从初始值开始)应用将两个值合并为一个值的操作,将集合中的所有值合并成一个。

例如,如果初始值是0,操作是加法,那么结果是所有数字的和:我们首先取初始值0,然后,我们把第一个元素1加上去;在结果1上,我们加上第二个数字2;对结果3,我们加上第三个数字3;对结果6,我们加上最后一个数字4,这个操作的结果是10,也就是 fold 返回的结果。

fun main() 
    val list = listOf(1, 2, 3, 4)
    val res = list.fold(0)  acc, i -> acc + i 
    println(res) // 10
    val res2 = list.fold(1)  acc, i -> acc * i 
    println(res2) // 24

fold 是一个终端操作,它也可以用于flow,但是它会挂起直到 flow 完成(就像 collect)一样。

suspend fun main() 
    val list = flowOf(1, 2, 3, 4)
        .onEach  delay(1000) 

    val res = list.fold(0)  acc, i -> acc + i 
    println(res)

// (4 sec)
// 10

有一种替代 fold 函数是 scan。它是一个能产生所有中间值的中间操作:

fun main() 
    val list = listOf(1, 2, 3, 4)
    val res = list.scan(0)  acc, i -> acc + i 
    println(res) // [0, 1, 3, 6, 10]

scan 对于 flow 来说非常有用,因为它可以在接收到上一步产生的值后,立即产生一个新的值。

suspend fun main() 
    flowOf(1, 2, 3, 4)
        .onEach  delay(1000) 
        .scan(0)  acc, v -> acc + v 
        .collect  println(it) 

// 0
// (1 sec)
// 1
// (1 sec)
// 3
// (1 sec)
// 6
// (1 sec)
// 10

我们可以使用 flow 构建器和 collect 轻松实现 scan。我们首先发射出初始值,然后对每个新元素发出下一个值的累积结果。

fun <T, R> Flow<T>.scan(
    initial: R,
    operation: suspend (accumulator: R, value: T) -> R
): Flow<R> = flow 
    var accumulator: R = initial
    emit(accumulator)
    collect  value ->
        accumulator = operation(accumulator, value)
        emit(accumulator)
    

scan 典型的使用场景是当我们需要更新或更改 flow ,或我们需要一个对象,这个对象是这些改变的结果时。

val userStateFlow: Flow<User> = userChangesFlow
    .scan(user)  acc, change -> user.withChange(change) 

val messagesListFlow: Flow<List<Message>> = messagesFlow
    .scan(messages)  acc, message -> acc + message 

flatMapConcat, flatMapMerge 和 flatMapLatest

另一个著名的集合处理函数是 flatMap,它类似于 map,但是这个转换函数最终返回一个扁平化的集合。例如,你有一个部门列表,每个部门都有一个员工列表,那么你可以使用 flatMap 来列出所有部门的所有员工。

val allEmployees: List<Employee> = departments
    .flatMap  department -> department.employees 
    
val listOfListsOfEmployee: List<List<Employee>> = departments
    .map  department -> department.employees 

flatMap 是如何应用到 flow 上的呢?我们可以期望可以返回一个 flow,然后该 flow 应该是扁平化的。问题是流动的元素可以随时间扩散,那么,一个流上的元素是否应该等待另外一个流上的元素,还是应该同时处理它们?因为没有一个明确的答案,所以 Flow 没有 flatMap 函数,而是有 flatMapConcatflatMapMergeflatMapLatest

flatMapConcat 函数依次处理生成的 flow。因此,第二个 flow 可以在第一个 flow 完成时启动。在下面的例子中,我们使用字符 “A” “B” “C” 来创建一个 flow,它们产生的新 flow 会包括这些字符和数字,中间有1s的延迟。

fun flowFrom(elem: String) = flowOf(1, 2, 3)
    .onEach  delay(1000) 
    .map  "$it_$elem " 

suspend fun main() 
    flowOf("A", "B", "C")
        .flatMapConcat  flowFrom(it) 
        .collect  println(it) 

// (1 sec)
// 1_A
// (1 sec)
// 2_A
// (1 sec)
// 3_A
// (1 sec)
// 1_B
// (1 sec)
// 2_B
// (1 sec)
// 3_B
// (1 sec)
// 1_C
// (1 sec)
// 2_C
// (1 sec)
// 3_C

第二个函数 flatMapMerge 对我来说是最直观的,它同时处理 flow 生成的值。

fun flowFrom(elem: String) = flowOf(1, 2, 3)
    .onEach  delay(1000) 
    .map  "$it_$elem " 

suspend fun main() 
    flowOf("A", "B", "C")
        .flatMapMerge  flowFrom(it) 
        .collect  printl

以上是关于深潜Kotlin协程(二十二):Flow的处理的主要内容,如果未能解决你的问题,请参考以下文章

深潜Kotlin协程(二十):构建 Flow

深潜Kotlin协程(二十一):Flow 生命周期函数

深潜Kotlin协程(二十一):Flow 生命周期函数

深潜Kotlin协程(二十三 完结篇):SharedFlow 和 StateFlow

深潜Kotlin协程(十九):Flow 概述

深潜Kotlin协程(二十三 完结篇):SharedFlow 和 StateFlow