协程中的流

Posted ImportSUC

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了协程中的流相关的知识,希望对你有一定的参考价值。

协程中的异步流 基础

一.什么是异步流

可以连续返回多个值

与集合的区别:集合可以返回多个计算好的值,但是只能一次性返回

与Rxjava 的流是同一个概念

二.如何创建异步流

1.最基础的流构建器 flow{}

  • flow { … } 构建块中的代码可以挂起,不再需要 suspend 修饰符。
  • emit 发射, collect 收集。flow 构建器中的代码直到流被收集的时候才运行
  • 不阻塞线程
  • collect在哪个协程中调用,如果没有特殊操作,则flow代码块也在相同协程中运行

``

fun simple(): Flow<Int> = flow {
    println("Flow started")
    for (i in 1..3) {
        kotlinx.coroutines.delay(1000)
        emit(i)
    }
}
fun main() = runBlocking {
     println("Calling simple function...")
     var flow = simple()
     println("Calling collect...")
     flow.collect { value -> println(value) }
     println("Calling collect again...")
     flow.collect { value -> println(value) }
}

结果显示

Calling simple function...
Calling collect...
Flow started
1
2
3
Calling collect again...
Flow started
1
2
3
end

2.flowof 构建器

定义一个发射固定值集的流

private fun simple2() = runBlocking {
    flowOf(1, 2, 3).collect { value -> println(value) }
}

3. asFlow() 扩展函数

使用asFlow() 扩展函数,可以将各种结合转换为流

private fun simaple3() = runBlocking {
    listOf(1, 2, 3).asFlow().collect { value -> println(value) }
}

三.操作符

1.filter , map 等过渡操作符

应用于上游,作用于下游

suspend fun performRequest(request: Int): String {
    delay(1000) 
    return "response $request"
}
private suspend fun simple5() {
    listOf(1, 2, 3).asFlow().filter { it > 2 }.map { performRequest(it) }
    .collect { println(it) }
}

结果显示

response 3

2.take限长操作符

当满足条件时,Flow 会被取消,此时会抛出异常。需要用try{} 来处理

private fun simple4(): Flow<Int> = flow {
    try {
        emit(1)
        emit(2)
        println("This line will not execute")
        emit(3)
    } catch (e: Exception) {
        e.printStackTrace()
    } finally{
      
    }

}
simple4().take(1).collect { it -> println(it) }

结果显示:

1
kotlinx.coroutines.flow.internal.AbortFlowException: Flow was aborted, no more elements needed

3.末端流操作符

  • 在流上用于启动流收集的挂起函数
  • collect 是最基础的末端操作符
  • toListtoSet。转化成各种集合
  • first 取第一个值
  • single 确保只发送一个值,否则就报错:java.lang.IllegalStateException: Expected only one element
fun simple6() = runBlocking {
    var first = listOf(1, 2, 3).asFlow().map { it + 1 }.first { it > 2 }
    println(first)
  
    var single = listOf(1).asFlow().map { it + 1 }.single()
    println(single)
}

4.flowOn 切换上下文

默认的,flow { ... } 构建器中的代码运行在相应流的收集器提供的上下文中

如果想让流运行在其他协程中,调用flowOn,创建一个新的协程供函数运行

fun simple7(): Flow<Int> = flow {
    for (i in 1..3) {
        Thread.sleep(100) 
        println("Emitting $i" + ",currentThread" + Thread.currentThread().name)
        emit(i)
    }
}.flowOn(Dispatchers.Default)
simple7().collect { value -> println("collect:" + value.toString().plus(",currentThread=" + Thread.currentThread().name)) }

结果显示

Emitting 1,currentThreadDefaultDispatcher-worker-1
collect:1,currentThread=main
Emitting 2,currentThreadDefaultDispatcher-worker-1
collect:2,currentThread=main
Emitting 3,currentThreadDefaultDispatcher-worker-1
collect:3,currentThread=main

以上是关于协程中的流的主要内容,如果未能解决你的问题,请参考以下文章

协程中的流

深入理解Kotlin协程协程中的Channel和Flow & 协程中的线程安全问题

Kotlin 协程协程中的多路复用技术 ② ( select 函数原型 | SelectClauseN 事件 | 查看挂起函数是否支持 select )

你应该知道的协程中的挂起转化小技巧

你应该知道的协程中的挂起转化小技巧

python 并发专题(十三):asyncio 协程中的多任务