Kotlin-Flow基础学习

Posted 且听真言

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kotlin-Flow基础学习相关的知识,希望对你有一定的参考价值。

一、Flow简单介绍

Flow——数据流

Flow的特点是可以在发送和接收的过程中对数据进行变更。

  runBlocking 
        flow 
            emit(1)
            emit(2)
            emit(3)
            emit(4)
        .filter  it > 1 
            .map 
                it * 3
            
            .take(2)
            .collect 
                println(it)
            
    

Log:

6
9

Process finished with exit code 0

1.flow是一个高阶函数。用来创建一个Flow。

public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)

// Named anonymous object
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>() 
    override suspend fun collectSafely(collector: FlowCollector<T>) 
        collector.block()
    

通过挂起函数emit发送数据。 通过emit函数往下游发送数据。

public fun interface FlowCollector<in T> 

    /**
     * Collects the value emitted by the upstream.
     * This method is not thread-safe and should not be invoked concurrently.
     */
    public suspend fun emit(value: T)

filter、map、take(),属于中间操作符。它们的作用是对数据进行处理。Flow的操作符与集合操作符高度一致。

public inline fun <T> Flow<T>.filter(crossinline predicate: suspend (T) -> Boolean): Flow<T> = transform  value ->
    if (predicate(value)) return@transform emit(value)

public inline fun <T, R> Flow<T>.map(crossinline transform: suspend (value: T) -> R): Flow<R> = transform  value ->
    return@transform emit(transform(value))
public fun <T> Flow<T>.take(count: Int): Flow<T> 
    require(count > 0)  "Requested element count $count should be positive" 
    return flow 
        var consumed = 0
        try 
            collect  value ->
                // Note: this for take is not written via collectWhile on purpose.
                // It checks condition first and then makes a tail-call to either emit or emitAbort.
                // This way normal execution does not require a state machine, only a termination (emitAbort).
                // See "TakeBenchmark" for comparision of different approaches.
                if (++consumed < count) 
                    return@collect emit(value)
                 else 
                    return@collect emitAbort(value)
                
            
         catch (e: AbortFlowException) 
            e.checkOwnership(owner = this)
        
    

collect称为终止操作符或者末端操作符,作用:终止数据流并接收数据。

public interface Flow<out T> 

    public suspend fun collect(collector: FlowCollector<T>)

2.通过flowOf()创建Flow

  runBlocking 
        flowOf(1, 2, 3, 4, 5).filter  it > 1 
            .map  it * 3 
            .take(2)
            .collect 
                println(it)
            
    

Log:
6
9

Process finished with exit code 0

3.可以将Flow转换为List

 runBlocking 
        flowOf(1,2,3,4,5)
            .toList()
            .filter  it > 1 
            .map  it * 3 
            .take(2)
            .forEach 
                println(it)
            
    
Log:
6
9

Process finished with exit code 0

4.List也可以转换为Flow

 runBlocking 
        listOf(1, 2, 3, 4, 5).asFlow()
            .filter  it > 1 
            .map  it * 3 
            .take(2)
            .collect 
                println(it)
            
    

Log
6
9

Process finished with exit code 0

Flow的创建方式适用场景用法
flow未知数据集flowemit(xx)
flowOf()已知具体的数据flowOf(1,2)
asFlow()已知数据来源的集合list.asFlow()

5.中间操作符

除了 map、filter、take 操作符之外,还有一些重要的操作符。

5.1onStart

onStart、onCompletion以操作符的形式存在,作用是监听生命周期回调。

   runBlocking 
        flowOf(1, 2, 3, 4, 5)
            .filter 
                println("do filter:$it")
                it > 3
            
            .map 
                println("do map:$it")
                it * 3
            
            .take(2)
            .onStart 
                println("onStart")
            
            .collect 
                println("collect:$it")
            
    

Log
onStart
do filter:1
do filter:2
do filter:3
do filter:4
do map:4
collect:12
do filter:5
do map:5
collect:15

Process finished with exit code 0

onStart是最先执行的,不会受到位置的影响,因为其本质是一个回调。

map、filter、take 操作符会受到位置的影响

 runBlocking 
        flowOf(1, 2, 3, 4, 5)
            .take(2)
            .filter 
                println("do filter:$it")
                it > 1
            
            .map 
                println("do map:$it")
                it * 3
            
            .onStart  println("onStart") 
            .collect 
                println("collect:$it")
            
    


Log

onStart
do filter:1
do filter:2
do map:2
collect:6

Process finished with exit code 0

 5.2onCompletion

  runBlocking 
        flowOf(1, 2, 3, 4, 5)
            .onCompletion 
                println("onCompletion")
            
            .filter 
                println("filter:$it")
                it > 3
            
            .take(2)
            .collect 
                println("collect:$it")
            
    

Log
filter:1
filter:2
filter:3
filter:4
collect:4
filter:5
collect:5
onCompletion

Process finished with exit code 0

onCompletion 只会在 Flow 数据流执行完毕以后,才会回调。

onCompletion 在面对以下三种情况时都会进行回调:

1 Flow 正常执行完毕;

  runBlocking 
        flowOf(1, 2, 3, 4, 5)
            .onCompletion 
                println("onCompletion")
            
            .filter 
                println("filter:$it")
                it > 3
            
            .take(2)
            .collect 
                println("collect:$it")
            
    

Log
filter:1
filter:2
filter:3
filter:4
collect:4
filter:5
collect:5
onCompletion

Process finished with exit code 0

2 Flow 当中出现异常;

   runBlocking 
        flowOf(1, 2, 3, 4, 5)
            .onCompletion 
                println("onCompletion when throw exception:$it")
            
            .collect 
                println("collect:$it")
                throw IllegalStateException()
            
    

Log

collect:1
onCompletion when throw exception:java.lang.IllegalStateException

3 Flow 被取消。

 runBlocking 
        launch 
            flow 
                emit(1)
                emit(2)
                emit(3)
                emit(4)
            .onCompletion 
                println("onCompletion when cancel:$it")
            
                .collect 
                    println("collect:$it")
                    if (it == 2) 
                        cancel()
                        println("cancel")
                    
                
        
    

Log:

collect:1
collect:2
cancel
onCompletion when cancel:kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job="coroutine#2":StandaloneCoroutineCancelling@42d8062c

Process finished with exit code 0

 在 collect 里调用了 cancel 方法,会取消掉整个 Flow,这时候,flow 当中剩下的代码将不会再被执行。最后,onCompletion 也会被调用,同时得到异常信息 JobCancellationException。同样当 Flow 当中发生异常以后,Flow 就会终止。

二.Flow异常处理

Flow 主要有三个部分:上游、中间操作、下游。Flow 当中的异常,也可以根据这个标准来进行分类,也就是异常发生的位置。

对于发生在上游、中间操作这两个阶段的异常,直接使用 catch 这个操作符来进行捕获和进一步处理。

runBlocking 
        val flow = flow 
            emit(1)
            emit(2)
            emit(3)
            throw IllegalStateException()
            emit(4)
        
        flow.map 
            it * 3
        
            .catch 
                println("catch exception:$it")
            
            .collect 
                println(it)
            
    

Log
3
6
9
catch exception:java.lang.IllegalStateException

Process finished with exit code 0

注意:catch的作用域,仅仅限制与catch的上游。

如果异常发生在catch的下游,则无法捕获。

 runBlocking 
        val flow = flow 
            emit(1)
            emit(2)
            emit(3)
            emit(4)
        
        flow.map 
            it * 2
        
            .catch 
                println("catch:$it")
            
            .filter 
                it / 0 > 1
            
            .collect 
                println(it)
            

    

Exception in thread "main" java.lang.ArithmeticException: / by zero
	at com.example.myapplication.testcoroutinue.TestFlowKt$testFlow11$1$invokeSuspend$$inlined$filter$1$2.emit(Emitters.kt:224)
	at kotlinx.coroutines.flow.FlowKt__ErrorsKt$catchImpl$2.emit(Errors.kt:158)
	at 

那么如何解决catch下游的异常呢?

答案是使用try catch

runBlocking 
        flowOf(1, 2, 3, 4, 5)
            .onCompletion 
                println("onCompletion:$it")
            
            .collect 
                try 
                    println("collect:$it")
                    throw IllegalStateException()
                 catch (e: Exception) 
                    println("exception:$e")
                
            
    

Log

collect:1
exception:java.lang.IllegalStateException
collect:2
exception:java.lang.IllegalStateException
collect:3
exception:java.lang.IllegalStateException
collect:4
exception:java.lang.IllegalStateException
collect:5
exception:java.lang.IllegalStateException
onCompletion:null

Process finished with exit code 0

三、切换Context:flowOn、launchIn

Flow 如何切换工作的线程?

1.使用flowOn 操作符。

 runBlocking 
        val flow = flow<Int> 
            printFlow("start")
            emit(1)
            printFlow("emit 1")
            emit(2)
            printFlow("emit 2")
            emit(3)
            printFlow("emit 3")
        
        flow.filter 
            printFlow("Filter:" + it)
            it > 2
        
            .flowOn(Dispatchers.IO)
            .collect 
                printFlow("collect:$it")
            
    

Log
start;Thread:DefaultDispatcher-worker-1 @coroutine#2
Filter:1;Thread:DefaultDispatcher-worker-1 @coroutine#2
emit 1;Thread:DefaultDispatcher-worker-1 @coroutine#2
Filter:2;Thread:DefaultDispatcher-worker-1 @coroutine#2
emit 2;Thread:DefaultDispatcher-worker-1 @coroutine#2
Filter:3;Thread:DefaultDispatcher-worker-1 @coroutine#2
emit 3;Thread:DefaultDispatcher-worker-1 @coroutine#2
collect:3;Thread:main @coroutine#1

Process finished with exit code 0

可以发现flowOn也是作用于它的上游。flowOn 的上游,就是 flow、filter 当中的代码,所以,它们的代码全都运行在 DefaultDispatcher 这个线程池当中。只有 collect 当中的代码是运行在 main 线程当中的。

修改flowOn位置

    runBlocking 
        val flow = flow<Int> 
            printFlow("start")
            emit(1)
            printFlow("emit 1")
            emit(2)
            printFlow("emit 2")
            emit(3)
            printFlow("emit 3")
        
            .flowOn(Dispatchers.IO)
        flow.filter 
            printFlow("Filter:" + it)
            it > 2
        

            .collect 
                printFlow("collect:$it")
            
    

start;Thread:DefaultDispatcher-worker-1 @coroutine#2
emit 1;Thread:DefaultDispatcher-worker-1 @coroutine#2
emit 2;Thread:DefaultDispatcher-worker-1 @coroutine#2
emit 3;Thread:DefaultDispatcher-worker-1 @coroutine#2
Filter:1;Thread:main @coroutine#1
Filter:2;Thread:main @coroutine#1
Filter:3;Thread:main @coroutine#1
collect:3;Thread:main @coroutine#1

Process finished with exit code 0

通过运行可以发现此时 flow 当中的代码运行在 DefaultDispatcher 当中,剩下的代码则执行在 main 线程。

2.withContext

如何指定collect中的Context?

使用withContext

runBlocking 
        val flow = flow<Int> 
            printFlow("start")
            emit(1)
            printFlow("emit 1")
            emit(2)
            printFlow("emit 2")
            emit(3)
            printFlow("emit 3")
        
            .flowOn(Dispatchers.IO)
        flow.filter 
            printFlow("Filter:" + it)
            it > 2
        

            .collect 
                withContext(mySingleDispatcherForFLow) 
                    printFlow("collect:$it")
                
            
    

val mySingleDispatcherForFLow = Executors.newSingleThreadExecutor 
    Thread(it, "MySingleThread").apply 
        isDaemon = true
    
.asCoroutineDispatcher()


Log:
start;Thread:DefaultDispatcher-worker-1 @coroutine#2
emit 1;Thread:DefaultDispatcher-worker-1 @coroutine#2
emit 2;Thread:DefaultDispatcher-worker-1 @coroutine#2
emit 3;Thread:DefaultDispatcher-worker-1 @coroutine#2
Filter:1;Thread:main @coroutine#1
Filter:2;Thread:main @coroutine#1
Filter:3;Thread:main @coroutine#1
collect:3;Thread:MySingleThread @coroutine#1

Process finished with exit code 0

collect 里使用了 withContext,所以它的执行就交给了 MySingleThread。

 withContext可以进一步扩大包裹范围

runBlocking 
        val flow = flow<Int> 
            printFlow("start")
            emit(1)
            printFlow("emit 1")
            emit(2)
            printFlow("emit 2")
            emit(3)
            printFlow("emit 3")
        
            .flowOn(Dispatchers.IO)


        withContext(mySingleDispatcherForFLow) 
            flow.filter 
                printFlow("Filter:" + it)
                it > 2
            

                .collect 
                    printFlow("collect:$it")
                
        

    

start;Thread:DefaultDispatcher-worker-1 @coroutine#2
emit 1;Thread:DefaultDispatcher-worker-1 @coroutine#2
emit 2;Thread:DefaultDispatcher-worker-1 @coroutine#2
emit 3;Thread:DefaultDispatcher-worker-1 @coroutine#2
Filter:1;Thread:MySingleThread @coroutine#1
Filter:2;Thread:MySingleThread @coroutine#1
Filter:3;Thread:MySingleThread @coroutine#1
collect:3;Thread:MySingleThread @coroutine#1

Process finished with exit code 0

3.launchIn 

 runBlocking 
        val flow = flow<Int> 
            printFlow("start")
            emit(1)
            printFlow("emit 1")
            emit(2)
            printFlow("emit 2")
            emit(3)
            printFlow("emit 3")
        .flowOn(Dispatchers.IO)
        val scope = CoroutineScope(mySingleDispatcherForFLow)
        flow.filter 
            printFlow("Filter:$it")
            it > 1
        .onEach 
            printFlow("onEach:$it")
        .launchIn(scope)


        delay(2000L)
    


Log
start;Thread:DefaultDispatcher-worker-1 @coroutine#3
emit 1;Thread:DefaultDispatcher-worker-1 @coroutine#3
emit 2;Thread:DefaultDispatcher-worker-1 @coroutine#3
emit 3;Thread:DefaultDispatcher-worker-1 @coroutine#3
Filter:1;Thread:MySingleThread @coroutine#2
Filter:2;Thread:MySingleThread @coroutine#2
onEach:2;Thread:MySingleThread @coroutine#2
Filter:3;Thread:MySingleThread @coroutine#2
onEach:3;Thread:MySingleThread @coroutine#2

Process finished with exit code 0

通过launchIn(scope)把上游代码分发到指定线程中。

public fun <T> Flow<T>.launchIn(scope: CoroutineScope): Job = scope.launch 
    collect() // tail-call

launchIn 从严格意义来讲,应该算是一个下游的终止操作符,因为它本质上是调用了 collect()。

上面的写法等价于下面的写法:

runBlocking 
        val flow = flow<Int> 
            printFlow("start")
            emit(1)
            printFlow("emit 1")
            emit(2)
            printFlow("emit 2")
            emit(3)
            printFlow("emit 3")
        .flowOn(Dispatchers.IO)
        val scope = CoroutineScope(mySingleDispatcherForFLow)


        scope.launch 
            flow.filter 
                printFlow("Filter:$it")
                it > 1
            .collect 
                printFlow("collect:$it")
            
        


        delay(2000L)
    

对于 Flow 当中的线程切换,可以使用 flowOn、launchIn、withContext,但其实,flowOn、launchIn 就已经可以满足需求了。另外,由于 Flow 当中直接使用 withContext 是很容易引发其他问题的,因此,withContext 在 Flow 当中是不被推荐的,谨慎要用。

四、下游,终止操作符

在 Flow 当中,终止操作符的意思就是终止整个 Flow 流程的操作符。collect 操作符之后,无法继续使用 map 之类的操作,因为 collect 是一个“终止”操作符,代表 Flow 数据流的终止。Flow 的终止操作符:比如 first()、single()、fold、reduce、toList()。

五、 为什么说Flow是“冷”的?

Channel 之所以被认为是“热”的原因,是因为不管有没有接收方,发送方都会工作。Flow 被认为是“冷”的就是因为只有调用终止操作符之后,Flow 才会开始工作。

 runBlocking 
        val flow = flow<Int> 
            (1..5).forEach 
                println("Before flow emit:$it")
                emit(it)
                println("After flow emit:$it")
            
        

        val channel = produce<Int> 
            (1..5).forEach 
                println("Before channel send:$it")
                send(it)
                println("After channel send:$it")
            
        

        println("End!")
    

Log
End!
Before channel send:1

Flow还是懒的。这个懒表现在Flow 一次只会处理一条数据。

 runBlocking 
        flow 
            println("emit:1")
            emit(1)
            println("emit:2")
            emit(2)
            println("emit:3")
            emit(3)
        .filter 
            println("filter:$it")
            it > 0
        .map 
            println("map:$it")
            it * 2
        .collect 
            println("collect:$it")
        
    


Log
emit:1
filter:1
map:1
collect:2
emit:2
filter:2
map:2
collect:4
emit:3
filter:3
map:3
collect:6

以上是关于Kotlin-Flow基础学习的主要内容,如果未能解决你的问题,请参考以下文章

Kotlin-Flow常见场景下的使用

关于Spring Cloud Gateway与下游服务器的连接分析

关于Spring Cloud Gateway与下游服务器的连接分析

紧跟“新基建”产业布局,「滨合毕方」发力分布式存储技术上下游产业链

持续集成学习7 jenkins自动化代码构建

如何使用 DATAX 以 UPSERT 语义更新下游 ORACLE 数据库中的数据?