协程中的流
Posted ImportSUC
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了协程中的流相关的知识,希望对你有一定的参考价值。
协程中的异步流 基础
一.什么是异步流
可以连续返回多个值
与集合的区别:集合可以返回多个计算好的值,但是只能一次性返回
与Rxjava 的流是同一个概念
二.如何创建异步流
1.最基础的流构建器 flow
- flow … 构建块中的代码可以挂起,不再需要 suspend 修饰符。
- emit 发射, collect 收集。flow 构建器中的代码直到流被收集的时候才运行
- 不阻塞线程
- collect在哪个协程中调用,如果没有特殊操作,则flow代码块也在相同协程中运行
``
fun simple(): Flow<Int> = flow
println("Flow started")
for (i in 1..3)
kotlinx.coroutines.delay(1000)
emit(i)
fun main() = runBlocking
println("Calling simple function...")
var flow = simple()
println("Calling collect...")
flow.collect value -> println(value)
println("Calling collect again...")
flow.collect value -> println(value)
结果显示
Calling simple function...
Calling collect...
Flow started
1
2
3
Calling collect again...
Flow started
1
2
3
end
2.flowof 构建器
定义一个发射固定值集的流
private fun simple2() = runBlocking
flowOf(1, 2, 3).collect value -> println(value)
3. asFlow() 扩展函数
使用asFlow() 扩展函数,可以将各种结合转换为流
private fun simaple3() = runBlocking
listOf(1, 2, 3).asFlow().collect value -> println(value)
三.操作符
1.filter , map 等过渡操作符
应用于上游,作用于下游
suspend fun performRequest(request: Int): String
delay(1000)
return "response $request"
private suspend fun simple5()
listOf(1, 2, 3).asFlow().filter it > 2 .map performRequest(it)
.collect println(it)
结果显示
response 3
2.take限长操作符
当满足条件时,Flow 会被取消,此时会抛出异常。需要用try 来处理
private fun simple4(): Flow<Int> = flow
try
emit(1)
emit(2)
println("This line will not execute")
emit(3)
catch (e: Exception)
e.printStackTrace()
finally
simple4().take(1).collect it -> println(it)
结果显示:
1
kotlinx.coroutines.flow.internal.AbortFlowException: Flow was aborted, no more elements needed
3.末端流操作符
- 在流上用于启动流收集的挂起函数
collect
是最基础的末端操作符toList
与toSet
。转化成各种集合- first 取第一个值
- single 确保只发送一个值,否则就报错:
java.lang.IllegalStateException: Expected only one element
fun simple6() = runBlocking
var first = listOf(1, 2, 3).asFlow().map it + 1 .first it > 2
println(first)
var single = listOf(1).asFlow().map it + 1 .single()
println(single)
4.flowOn 切换上下文
默认的,flow ...
构建器中的代码运行在相应流的收集器提供的上下文中
如果想让流运行在其他协程中,调用flowOn,创建一个新的协程供函数运行
fun simple7(): Flow<Int> = flow
for (i in 1..3)
Thread.sleep(100)
println("Emitting $i" + ",currentThread" + Thread.currentThread().name)
emit(i)
.flowOn(Dispatchers.Default)
simple7().collect value -> println("collect:" + value.toString().plus(",currentThread=" + Thread.currentThread().name))
结果显示
Emitting 1,currentThreadDefaultDispatcher-worker-1
collect:1,currentThread=main
Emitting 2,currentThreadDefaultDispatcher-worker-1
collect:2,currentThread=main
Emitting 3,currentThreadDefaultDispatcher-worker-1
collect:3,currentThread=main
以上是关于协程中的流的主要内容,如果未能解决你的问题,请参考以下文章
深入理解Kotlin协程协程中的Channel和Flow & 协程中的线程安全问题
python 并发专题(十三):asyncio 协程中的多任务