协程中的流

Posted ImportSUC

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了协程中的流相关的知识,希望对你有一定的参考价值。

协程中的异步流 基础

一.什么是异步流

可以连续返回多个值

与集合的区别:集合可以返回多个计算好的值,但是只能一次性返回

与Rxjava 的流是同一个概念

二.如何创建异步流

1.最基础的流构建器 flow

  • flow … 构建块中的代码可以挂起,不再需要 suspend 修饰符。
  • emit 发射, collect 收集。flow 构建器中的代码直到流被收集的时候才运行
  • 不阻塞线程
  • collect在哪个协程中调用,如果没有特殊操作,则flow代码块也在相同协程中运行

``

fun simple(): Flow<Int> = flow 
    println("Flow started")
    for (i in 1..3) 
        kotlinx.coroutines.delay(1000)
        emit(i)
    

fun main() = runBlocking 
     println("Calling simple function...")
     var flow = simple()
     println("Calling collect...")
     flow.collect  value -> println(value) 
     println("Calling collect again...")
     flow.collect  value -> println(value) 

结果显示

Calling simple function...
Calling collect...
Flow started
1
2
3
Calling collect again...
Flow started
1
2
3
end

2.flowof 构建器

定义一个发射固定值集的流

private fun simple2() = runBlocking 
    flowOf(1, 2, 3).collect  value -> println(value) 


3. asFlow() 扩展函数

使用asFlow() 扩展函数,可以将各种结合转换为流

private fun simaple3() = runBlocking 
    listOf(1, 2, 3).asFlow().collect  value -> println(value) 

三.操作符

1.filter , map 等过渡操作符

应用于上游,作用于下游

suspend fun performRequest(request: Int): String 
    delay(1000) 
    return "response $request"

private suspend fun simple5() 
    listOf(1, 2, 3).asFlow().filter  it > 2 .map  performRequest(it) 
    .collect  println(it) 

结果显示

response 3

2.take限长操作符

当满足条件时,Flow 会被取消,此时会抛出异常。需要用try 来处理

private fun simple4(): Flow<Int> = flow 
    try 
        emit(1)
        emit(2)
        println("This line will not execute")
        emit(3)
     catch (e: Exception) 
        e.printStackTrace()
     finally
      
    


simple4().take(1).collect  it -> println(it) 

结果显示:

1
kotlinx.coroutines.flow.internal.AbortFlowException: Flow was aborted, no more elements needed

3.末端流操作符

  • 在流上用于启动流收集的挂起函数
  • collect 是最基础的末端操作符
  • toListtoSet。转化成各种集合
  • first 取第一个值
  • single 确保只发送一个值,否则就报错:java.lang.IllegalStateException: Expected only one element
fun simple6() = runBlocking 
    var first = listOf(1, 2, 3).asFlow().map  it + 1 .first  it > 2 
    println(first)
  
    var single = listOf(1).asFlow().map  it + 1 .single()
    println(single)


4.flowOn 切换上下文

默认的,flow ... 构建器中的代码运行在相应流的收集器提供的上下文中

如果想让流运行在其他协程中,调用flowOn,创建一个新的协程供函数运行

fun simple7(): Flow<Int> = flow 
    for (i in 1..3) 
        Thread.sleep(100) 
        println("Emitting $i" + ",currentThread" + Thread.currentThread().name)
        emit(i)
    
.flowOn(Dispatchers.Default)
simple7().collect  value -> println("collect:" + value.toString().plus(",currentThread=" + Thread.currentThread().name)) 

结果显示

Emitting 1,currentThreadDefaultDispatcher-worker-1
collect:1,currentThread=main
Emitting 2,currentThreadDefaultDispatcher-worker-1
collect:2,currentThread=main
Emitting 3,currentThreadDefaultDispatcher-worker-1
collect:3,currentThread=main

以上是关于协程中的流的主要内容,如果未能解决你的问题,请参考以下文章

深入理解Kotlin协程协程中的Channel和Flow & 协程中的线程安全问题

python 并发专题(十三):asyncio 协程中的多任务

python 并发专题(十三):asyncio 协程中的多任务

Kotlin 协程中的挂起函数是啥意思?

使用context关闭协程以及协程中的协程

协程中的流