深潜Kotlin协程(十九):Flow 概述

Posted RikkaTheWorld

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了深潜Kotlin协程(十九):Flow 概述相关的知识,希望对你有一定的参考价值。

系列电子书:传送门


Flow 表示的是一个用于异步计算的数据流Flow 接口本身只允许收集那些流动的元素,这也就是说每个元素只有到达流的末端时,我们才去处理它们(Flow 的 collect 类似于集合的 forEach)。

interface Flow<out T> 
    suspend fun collect(collector: FlowCollector<T>)

可以看到, collectFlow 中唯一的成员函数。其它的函数都被定义为扩展函数。这与 IterableSequence 类似,它们都只有 iterator 作为成员函数。

interface Iterable<out T> 
    operator fun iterator(): Iterator<T>


interface Sequence<out T> 
    operator fun iterator(): Iterator<T>

流 vs 其他表示数据的方式

对于使用 RxJavaReactor 的人来说,流的概念应该是他们最熟悉的,但对于其他不熟悉的人,可能需要更好的解释。

假设你需要一个函数返回多个值,如果这些值同时被提供出来,我们会使用像 ListSet 这样的集合。

fun allUsers(): List<User> =
    api.getAllUsers().map  it.toUser() 

这里的本质是 ListSet 表示的是一个完全计算的集合。处理这些值需要时间,所以我们需要等待所有的值处理好,然后才能拿到它们。

fun getList(): List<Int> = List(3) 
    Thread.sleep(1000)
    "User$it"


fun main() 
    val list = getList()
    println("Function started")
    list.forEach  println(it) 

// (3 sec)
// Function started
// User0
// User1
// User2

如果元素是一个接一个出现,我们会使用的一种方法是 Sequence

fun getSequence(): Sequence<String> = sequence 
    repeat(3) 
        Thread.sleep(1000)
        yield("User$it")
    

fun main() 
    val list = getSequence()
    println("Function started")
    list.forEach  println(it) 

// Function started
// (1 sec)
// User0
// (1 sec)
// User1
// (1 sec)
// User2

当计算可能是 CPU 密集型(比如计算复杂的结果)或阻塞的(比如读取文件)时候,序列是一个合适的按需计算的数据流。但是,你必须要知道序列的终端操作(如 forEach)是不会挂起的,因此序列构建器中的任何挂起都意味着阻塞等待线程来处理这个值。这就是为什么在 sequence 构建器的作用域中,除了在 SequenceScope 接收者上调用的函数(yieldyieldAll)外,不能使用任何挂起函数。

fun getSequence(): Sequence<String> = sequence 
    repeat(3) 
        delay(1000) // 这里编译错误
        yield("User$it")
    

引入这种机制是为了防止序列被误用。例如,有人可能希望使用分页的方式从 Http 端口获取所有的用户列表,直到接收到空白的数据。
即使上面的例子可以通过编译,它也不会是正确的,因为终端操作(如 forEach)将阻塞线程而不是挂起线程,这可能会导致意外的线程阻塞。

// 不要这样做,我们应该使用 Flow 来代替 Sequence
fun allUsersSequence(
    api: UserApi
): Sequence<User> = sequence 
    var page = 0
    do 
        val users = api.takePage(page++) // 挂起了,所以编译错误
        yieldAll(users)
     while (!users.isNullOrEmpty())

我希望你已经了解到线程阻塞可能是危险的,会导致意想不到的情况,为了更清楚地说明这一点,看一下下面的示例,我们使用 Sequence,因此它的 forEach 是一个阻塞操作。这就是为什么在同一个线程上启动的协程会等待,一个协程的执行会阻塞另一个协程的执行:

fun getSequence(): Sequence<String> = sequence 
    repeat(3) 
        Thread.sleep(1000)
        // 就算这里能使用 delay(1000) ,结果也还是一样的
        yield("User$it")
    


suspend fun main() 
    withContext(newSingleThreadContext("main")) 
        launch 
            repeat(3) 
                delay(100)
                println("Processing on coroutine")
            
        
        val list = getSequence()
        list.forEach  println(it) 
    

// (1 sec)
// User0
// (1 sec)
// User1
// (1 sec)
// User2
// Processing on coroutine
// (0.1 sec)
// Processing on coroutine
// (0.1 sec)
// Processing on coroutine

在这种情况下,我们应该使用 Flow 而不是 Sequence。它完全支持协程。它的构建器和操作都是可挂起,并且支持结构化并发和适当的异常处理。我们将在下一章中解释这些内容。但现在让我们看看它对这个案例有什么帮助。

fun getFlow(): Flow<String> = flow 
    repeat(3) 
        delay(1000)
        emit("User$it")
    


suspend fun main() 
    withContext(newSingleThreadContext("main")) 
        launch 
            repeat(3) 
                delay(100)
                println("Processing on coroutine")
            
        
        val list = getFlow()
        list.collect  println(it) 
    

// (0.1 sec)
// Processing on coroutine
// (0.1 sec)
// Processing on coroutine
// (0.1 sec)
// Processing on coroutine
// (1 - 3 * 0.1 = 0.7 sec)
// User0
// (1 sec)
// User1
// (1 sec)
// User2

Flow 应该用于需要使用协程的数据流。例如,它可以用于生成一个从 API 页面逐页获取的用户流。例如,如果我们调用 allUserFlow(api).first(),我们将获取到第一页;如果我们调用 allUserFlow(api).toList() ,我们将获取所有数据;如果我们调用 allUserFlow(api).find it.id == id ,我们将一直拉取页面数据,直到找到我们想要找到的页面。

fun allUsersFlow(
    api: UserApi
): Flow<User> = flow 
    var page = 0
    do 
        val users = api.takePage(page++) // 挂起了
        emitAll(users)
     while (!users.isNullOrEmpty())

Flow 的特性

Flow 的终端操作(如 collect)将挂起一个协程,而不是阻塞线程。它们还支持其它协程功能,例如异常的处理。Flow 处理可以被取消,并且可以在外部支持结构化并发。 flow 构建器不会挂起,也不需要任何作用域。

下面的示例展示了 CoroutineName 上下文如何从集合传递到 flow 构建器中的。它还表明,launch 的取消也会导致 flow 的处理被取消。

// 注意,该函数不会挂起,而且不需要任何的 CoroutineScope
fun usersFlow(): Flow<String> = flow 
    repeat(3) 
        delay(1000)
        val ctx = currentCoroutineContext()
        val name = ctx[CoroutineName]?.name
        emit("User$it in $name")
    


suspend fun main() 
    val users = usersFlow()
    withContext(CoroutineName("Name")) 
        val job = launch 
            // collect 是挂起的
            users.collect  println(it) 
        
        launch 
            delay(2100)
            println("I got enough")
            job.cancel()
        
    

// (1 sec)
// User0 in Name
// (1 sec)
// User1 in Name
// (0.1 sec)
// I got enough

Flow 命名法

  • Flow 需要从某个地方开始,它通常从一个流构建器开始,从不同的对象或从某些 helper 函数开始,最重要的选项在下一章中解释
  • Flow 上最后一个操作被称为终端操作,这是非常重要的,因为它通常是唯一的挂起函数,或需要协程作用域的操作。典型的终端操作是 collect。然而,还有其它终端操作,我会在后面的章节中讲解
  • 在开始操作和终端操作之间,我们可能有中间操作,每个操作都以某种方式修改流,我们将在Flow的生命周期处理Flow的章节中学习不同的中间操作

实际用例

实践表明,我们更多时候需要的是 flow,而不是 channel。如果请求数据流,我们通常希望是按需请求的。如果你需要观察某些东西,例如数据库中的更改或者来自 UI 部件的感知,你可能希望每个观察者都能接收到这些事件。当没有人要观察时,你也要停止监听。这就是为什么在所有这些情况下,使用 flow 会比使用 channel 更好(尽管在某些情况下,我们将这两者混合使用)。

flow 最典型的用法包括:

  • 接收从 Server 连通通道中发送的消息,如 WebSocket、通知等
  • 观察用户的操作,如文本更改或点击
  • 接收来自传感器或设备的其他信息的更新,如其位置或方向
  • 观察数据库的变化

下面是我们如何使用 Room 库来观察 SQL 数据库的变化:

@Dao
interface MyDao 
@Query("SELECT * FROM somedata_table")
    fun getData(): Flow<List<SomeData>>

让我们看一些示例,看看如何使用 flow 来处理来自 API 的响应流。首先,假设你实现了聊天功能,其中消息通过 Server 通道和通知发送。将两个数据源作为一个流,将它们合并在一起,然后用该流来更新视图,这是很方便的。另一个例子可能是用它来提供越来越好的响应结果。例如,当我们在 SkyScanner 上搜索最佳航班时,有些报价很快就会到达,但随着时间的推移,会有更多更好的报价达到,因此,你会看到越来越好的结果。这也是使用 flow 的一个很好的例子。

除了这些情况,对于不同的并发处理, flow 也是一个有用的工具。例如,假设你有一个卖家列表,你需要获取每个卖家的报价。我们已经知道可以使用 async 在集合处理中实现这一点:

suspend fun getOffers(
    sellers: List<Seller>
): List<Offer> = coroutineScope 
    sellers
        .map  seller ->
            async  api.requestOffers(seller.id) 
        
        .flatMap  it.await() 

上面的方法在很多情况下是正确的,但它有一个缺点:当卖家列表很大时,一次发送这么多请求对我们和服务器都没有什么好处。当然,这可以在服务器中进行限频或限流,但我们也希望在客户端控制它,因此我们可以使用 Flow。在这种情况下,为了将并发调用的数量限制在20个,我们可以使用 flaotMapMerge,并将最大并发数 concurrency 修改为 20:

suspend fun getOffers(
    sellers: List<Seller>
): List<Offer> = sellers
    .asFlow()
    .flatMapMerge(concurrency = 20)  seller ->
        suspend  api.requestOffers(seller.id) .asFlow()
    
    .toList()

对 Flow 而不是集合进行操作,可以让我们对并发行为、上下文、异常等进行更多的控制。我们将在下一章中探索这些功能,这就是(以我的经验) flow 最有用的地方。我希望在我们介绍了它的所有不同功能之后,你能清楚的了解这一点。

最后,因为更喜欢响应式编程的风格,一些团队倾向使用响应流而不是挂起函数。这种风格在 android 上很流行,其中 RxJava 就很主流,但现在 Flow 通常被视为更好的选择。

正如你所看到的,flow 有相当多的用例。在一些项目中,它们会被普遍使用,而在另一些项目中,它们只会被偶尔使用。但我希望你能知道它是有用的,值得学习的。

总结

在本章中,我们介绍了 Flow 的概念。它表示支持协程(不同的序列)的异步数据流。在相当多的用例中,flow 是有用的。

以上是关于深潜Kotlin协程(十九):Flow 概述的主要内容,如果未能解决你的问题,请参考以下文章

深潜Kotlin协程(二十):构建 Flow

深潜Kotlin协程(二十):构建 Flow

深潜Kotlin协程(二十二):Flow的处理

深潜Kotlin协程(二十一):Flow 生命周期函数

深潜Kotlin协程(二十一):Flow 生命周期函数

深潜Kotlin协程(十八):冷热数据流