协程中的流
Posted ImportSUC
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了协程中的流相关的知识,希望对你有一定的参考价值。
协程中的异步流 基础
一.什么是异步流
可以连续返回多个值
与集合的区别:集合可以返回多个计算好的值,但是只能一次性返回
与Rxjava 的流是同一个概念
二.如何创建异步流
1.最基础的流构建器 flow{}
- flow { … } 构建块中的代码可以挂起,不再需要 suspend 修饰符。
- emit 发射, collect 收集。flow 构建器中的代码直到流被收集的时候才运行
- 不阻塞线程
- collect在哪个协程中调用,如果没有特殊操作,则flow代码块也在相同协程中运行
``
fun simple(): Flow<Int> = flow {
println("Flow started")
for (i in 1..3) {
kotlinx.coroutines.delay(1000)
emit(i)
}
}
fun main() = runBlocking {
println("Calling simple function...")
var flow = simple()
println("Calling collect...")
flow.collect { value -> println(value) }
println("Calling collect again...")
flow.collect { value -> println(value) }
}
结果显示
Calling simple function...
Calling collect...
Flow started
1
2
3
Calling collect again...
Flow started
1
2
3
end
2.flowof 构建器
定义一个发射固定值集的流
private fun simple2() = runBlocking {
flowOf(1, 2, 3).collect { value -> println(value) }
}
3. asFlow() 扩展函数
使用asFlow() 扩展函数,可以将各种结合转换为流
private fun simaple3() = runBlocking {
listOf(1, 2, 3).asFlow().collect { value -> println(value) }
}
三.操作符
1.filter , map 等过渡操作符
应用于上游,作用于下游
suspend fun performRequest(request: Int): String {
delay(1000)
return "response $request"
}
private suspend fun simple5() {
listOf(1, 2, 3).asFlow().filter { it > 2 }.map { performRequest(it) }
.collect { println(it) }
}
结果显示
response 3
2.take限长操作符
当满足条件时,Flow 会被取消,此时会抛出异常。需要用try{} 来处理
private fun simple4(): Flow<Int> = flow {
try {
emit(1)
emit(2)
println("This line will not execute")
emit(3)
} catch (e: Exception) {
e.printStackTrace()
} finally{
}
}
simple4().take(1).collect { it -> println(it) }
结果显示:
1
kotlinx.coroutines.flow.internal.AbortFlowException: Flow was aborted, no more elements needed
3.末端流操作符
- 在流上用于启动流收集的挂起函数
collect
是最基础的末端操作符toList
与toSet
。转化成各种集合- first 取第一个值
- single 确保只发送一个值,否则就报错:
java.lang.IllegalStateException: Expected only one element
fun simple6() = runBlocking {
var first = listOf(1, 2, 3).asFlow().map { it + 1 }.first { it > 2 }
println(first)
var single = listOf(1).asFlow().map { it + 1 }.single()
println(single)
}
4.flowOn 切换上下文
默认的,flow { ... }
构建器中的代码运行在相应流的收集器提供的上下文中
如果想让流运行在其他协程中,调用flowOn,创建一个新的协程供函数运行
fun simple7(): Flow<Int> = flow {
for (i in 1..3) {
Thread.sleep(100)
println("Emitting $i" + ",currentThread" + Thread.currentThread().name)
emit(i)
}
}.flowOn(Dispatchers.Default)
simple7().collect { value -> println("collect:" + value.toString().plus(",currentThread=" + Thread.currentThread().name)) }
结果显示
Emitting 1,currentThreadDefaultDispatcher-worker-1
collect:1,currentThread=main
Emitting 2,currentThreadDefaultDispatcher-worker-1
collect:2,currentThread=main
Emitting 3,currentThreadDefaultDispatcher-worker-1
collect:3,currentThread=main
以上是关于协程中的流的主要内容,如果未能解决你的问题,请参考以下文章
深入理解Kotlin协程协程中的Channel和Flow & 协程中的线程安全问题
Kotlin 协程协程中的多路复用技术 ② ( select 函数原型 | SelectClauseN 事件 | 查看挂起函数是否支持 select )