Kotlin-Flow基础学习
Posted 且听真言
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kotlin-Flow基础学习相关的知识,希望对你有一定的参考价值。
一、Flow简单介绍
Flow——数据流
Flow的特点是可以在发送和接收的过程中对数据进行变更。
runBlocking
flow
emit(1)
emit(2)
emit(3)
emit(4)
.filter it > 1
.map
it * 3
.take(2)
.collect
println(it)
Log:
6
9
Process finished with exit code 0
1.flow是一个高阶函数。用来创建一个Flow。
public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)
// Named anonymous object
private class SafeFlow<T>(private val block: suspend FlowCollector<T>.() -> Unit) : AbstractFlow<T>()
override suspend fun collectSafely(collector: FlowCollector<T>)
collector.block()
通过挂起函数emit发送数据。 通过emit函数往下游发送数据。
public fun interface FlowCollector<in T>
/**
* Collects the value emitted by the upstream.
* This method is not thread-safe and should not be invoked concurrently.
*/
public suspend fun emit(value: T)
filter、map、take(),属于中间操作符。它们的作用是对数据进行处理。Flow的操作符与集合操作符高度一致。
public inline fun <T> Flow<T>.filter(crossinline predicate: suspend (T) -> Boolean): Flow<T> = transform value ->
if (predicate(value)) return@transform emit(value)
public inline fun <T, R> Flow<T>.map(crossinline transform: suspend (value: T) -> R): Flow<R> = transform value ->
return@transform emit(transform(value))
public fun <T> Flow<T>.take(count: Int): Flow<T>
require(count > 0) "Requested element count $count should be positive"
return flow
var consumed = 0
try
collect value ->
// Note: this for take is not written via collectWhile on purpose.
// It checks condition first and then makes a tail-call to either emit or emitAbort.
// This way normal execution does not require a state machine, only a termination (emitAbort).
// See "TakeBenchmark" for comparision of different approaches.
if (++consumed < count)
return@collect emit(value)
else
return@collect emitAbort(value)
catch (e: AbortFlowException)
e.checkOwnership(owner = this)
collect称为终止操作符或者末端操作符,作用:终止数据流并接收数据。
public interface Flow<out T>
public suspend fun collect(collector: FlowCollector<T>)
2.通过flowOf()创建Flow
runBlocking
flowOf(1, 2, 3, 4, 5).filter it > 1
.map it * 3
.take(2)
.collect
println(it)
Log:
6
9
Process finished with exit code 0
3.可以将Flow转换为List
runBlocking
flowOf(1,2,3,4,5)
.toList()
.filter it > 1
.map it * 3
.take(2)
.forEach
println(it)
Log:
6
9
Process finished with exit code 0
4.List也可以转换为Flow
runBlocking
listOf(1, 2, 3, 4, 5).asFlow()
.filter it > 1
.map it * 3
.take(2)
.collect
println(it)
Log
6
9
Process finished with exit code 0
Flow的创建方式 | 适用场景 | 用法 |
flow | 未知数据集 | flowemit(xx) |
flowOf() | 已知具体的数据 | flowOf(1,2) |
asFlow() | 已知数据来源的集合 | list.asFlow() |
5.中间操作符
除了 map、filter、take 操作符之外,还有一些重要的操作符。
5.1onStart
onStart、onCompletion以操作符的形式存在,作用是监听生命周期回调。
runBlocking
flowOf(1, 2, 3, 4, 5)
.filter
println("do filter:$it")
it > 3
.map
println("do map:$it")
it * 3
.take(2)
.onStart
println("onStart")
.collect
println("collect:$it")
Log
onStart
do filter:1
do filter:2
do filter:3
do filter:4
do map:4
collect:12
do filter:5
do map:5
collect:15
Process finished with exit code 0
onStart是最先执行的,不会受到位置的影响,因为其本质是一个回调。
map、filter、take 操作符会受到位置的影响
runBlocking
flowOf(1, 2, 3, 4, 5)
.take(2)
.filter
println("do filter:$it")
it > 1
.map
println("do map:$it")
it * 3
.onStart println("onStart")
.collect
println("collect:$it")
Log
onStart
do filter:1
do filter:2
do map:2
collect:6
Process finished with exit code 0
5.2onCompletion
runBlocking
flowOf(1, 2, 3, 4, 5)
.onCompletion
println("onCompletion")
.filter
println("filter:$it")
it > 3
.take(2)
.collect
println("collect:$it")
Log
filter:1
filter:2
filter:3
filter:4
collect:4
filter:5
collect:5
onCompletion
Process finished with exit code 0
onCompletion 只会在 Flow 数据流执行完毕以后,才会回调。
onCompletion 在面对以下三种情况时都会进行回调:
1 Flow 正常执行完毕;
runBlocking
flowOf(1, 2, 3, 4, 5)
.onCompletion
println("onCompletion")
.filter
println("filter:$it")
it > 3
.take(2)
.collect
println("collect:$it")
Log
filter:1
filter:2
filter:3
filter:4
collect:4
filter:5
collect:5
onCompletion
Process finished with exit code 0
2 Flow 当中出现异常;
runBlocking
flowOf(1, 2, 3, 4, 5)
.onCompletion
println("onCompletion when throw exception:$it")
.collect
println("collect:$it")
throw IllegalStateException()
Log
collect:1
onCompletion when throw exception:java.lang.IllegalStateException
3 Flow 被取消。
runBlocking
launch
flow
emit(1)
emit(2)
emit(3)
emit(4)
.onCompletion
println("onCompletion when cancel:$it")
.collect
println("collect:$it")
if (it == 2)
cancel()
println("cancel")
Log:
collect:1
collect:2
cancel
onCompletion when cancel:kotlinx.coroutines.JobCancellationException: StandaloneCoroutine was cancelled; job="coroutine#2":StandaloneCoroutineCancelling@42d8062c
Process finished with exit code 0
在 collect 里调用了 cancel 方法,会取消掉整个 Flow,这时候,flow 当中剩下的代码将不会再被执行。最后,onCompletion 也会被调用,同时得到异常信息 JobCancellationException。同样当 Flow 当中发生异常以后,Flow 就会终止。
二.Flow异常处理
Flow 主要有三个部分:上游、中间操作、下游。Flow 当中的异常,也可以根据这个标准来进行分类,也就是异常发生的位置。
对于发生在上游、中间操作这两个阶段的异常,直接使用 catch 这个操作符来进行捕获和进一步处理。
runBlocking
val flow = flow
emit(1)
emit(2)
emit(3)
throw IllegalStateException()
emit(4)
flow.map
it * 3
.catch
println("catch exception:$it")
.collect
println(it)
Log
3
6
9
catch exception:java.lang.IllegalStateException
Process finished with exit code 0
注意:catch的作用域,仅仅限制与catch的上游。
如果异常发生在catch的下游,则无法捕获。
runBlocking
val flow = flow
emit(1)
emit(2)
emit(3)
emit(4)
flow.map
it * 2
.catch
println("catch:$it")
.filter
it / 0 > 1
.collect
println(it)
Exception in thread "main" java.lang.ArithmeticException: / by zero
at com.example.myapplication.testcoroutinue.TestFlowKt$testFlow11$1$invokeSuspend$$inlined$filter$1$2.emit(Emitters.kt:224)
at kotlinx.coroutines.flow.FlowKt__ErrorsKt$catchImpl$2.emit(Errors.kt:158)
at
那么如何解决catch下游的异常呢?
答案是使用try catch
runBlocking
flowOf(1, 2, 3, 4, 5)
.onCompletion
println("onCompletion:$it")
.collect
try
println("collect:$it")
throw IllegalStateException()
catch (e: Exception)
println("exception:$e")
Log
collect:1
exception:java.lang.IllegalStateException
collect:2
exception:java.lang.IllegalStateException
collect:3
exception:java.lang.IllegalStateException
collect:4
exception:java.lang.IllegalStateException
collect:5
exception:java.lang.IllegalStateException
onCompletion:null
Process finished with exit code 0
三、切换Context:flowOn、launchIn
Flow 如何切换工作的线程?
1.使用flowOn 操作符。
runBlocking
val flow = flow<Int>
printFlow("start")
emit(1)
printFlow("emit 1")
emit(2)
printFlow("emit 2")
emit(3)
printFlow("emit 3")
flow.filter
printFlow("Filter:" + it)
it > 2
.flowOn(Dispatchers.IO)
.collect
printFlow("collect:$it")
Log
start;Thread:DefaultDispatcher-worker-1 @coroutine#2
Filter:1;Thread:DefaultDispatcher-worker-1 @coroutine#2
emit 1;Thread:DefaultDispatcher-worker-1 @coroutine#2
Filter:2;Thread:DefaultDispatcher-worker-1 @coroutine#2
emit 2;Thread:DefaultDispatcher-worker-1 @coroutine#2
Filter:3;Thread:DefaultDispatcher-worker-1 @coroutine#2
emit 3;Thread:DefaultDispatcher-worker-1 @coroutine#2
collect:3;Thread:main @coroutine#1
Process finished with exit code 0
可以发现flowOn也是作用于它的上游。flowOn 的上游,就是 flow、filter 当中的代码,所以,它们的代码全都运行在 DefaultDispatcher 这个线程池当中。只有 collect 当中的代码是运行在 main 线程当中的。
修改flowOn位置
runBlocking
val flow = flow<Int>
printFlow("start")
emit(1)
printFlow("emit 1")
emit(2)
printFlow("emit 2")
emit(3)
printFlow("emit 3")
.flowOn(Dispatchers.IO)
flow.filter
printFlow("Filter:" + it)
it > 2
.collect
printFlow("collect:$it")
start;Thread:DefaultDispatcher-worker-1 @coroutine#2
emit 1;Thread:DefaultDispatcher-worker-1 @coroutine#2
emit 2;Thread:DefaultDispatcher-worker-1 @coroutine#2
emit 3;Thread:DefaultDispatcher-worker-1 @coroutine#2
Filter:1;Thread:main @coroutine#1
Filter:2;Thread:main @coroutine#1
Filter:3;Thread:main @coroutine#1
collect:3;Thread:main @coroutine#1
Process finished with exit code 0
通过运行可以发现此时 flow 当中的代码运行在 DefaultDispatcher 当中,剩下的代码则执行在 main 线程。
2.withContext
如何指定collect中的Context?
使用withContext
runBlocking
val flow = flow<Int>
printFlow("start")
emit(1)
printFlow("emit 1")
emit(2)
printFlow("emit 2")
emit(3)
printFlow("emit 3")
.flowOn(Dispatchers.IO)
flow.filter
printFlow("Filter:" + it)
it > 2
.collect
withContext(mySingleDispatcherForFLow)
printFlow("collect:$it")
val mySingleDispatcherForFLow = Executors.newSingleThreadExecutor
Thread(it, "MySingleThread").apply
isDaemon = true
.asCoroutineDispatcher()
Log:
start;Thread:DefaultDispatcher-worker-1 @coroutine#2
emit 1;Thread:DefaultDispatcher-worker-1 @coroutine#2
emit 2;Thread:DefaultDispatcher-worker-1 @coroutine#2
emit 3;Thread:DefaultDispatcher-worker-1 @coroutine#2
Filter:1;Thread:main @coroutine#1
Filter:2;Thread:main @coroutine#1
Filter:3;Thread:main @coroutine#1
collect:3;Thread:MySingleThread @coroutine#1
Process finished with exit code 0
collect 里使用了 withContext,所以它的执行就交给了 MySingleThread。
withContext可以进一步扩大包裹范围
runBlocking
val flow = flow<Int>
printFlow("start")
emit(1)
printFlow("emit 1")
emit(2)
printFlow("emit 2")
emit(3)
printFlow("emit 3")
.flowOn(Dispatchers.IO)
withContext(mySingleDispatcherForFLow)
flow.filter
printFlow("Filter:" + it)
it > 2
.collect
printFlow("collect:$it")
start;Thread:DefaultDispatcher-worker-1 @coroutine#2
emit 1;Thread:DefaultDispatcher-worker-1 @coroutine#2
emit 2;Thread:DefaultDispatcher-worker-1 @coroutine#2
emit 3;Thread:DefaultDispatcher-worker-1 @coroutine#2
Filter:1;Thread:MySingleThread @coroutine#1
Filter:2;Thread:MySingleThread @coroutine#1
Filter:3;Thread:MySingleThread @coroutine#1
collect:3;Thread:MySingleThread @coroutine#1
Process finished with exit code 0
3.launchIn
runBlocking
val flow = flow<Int>
printFlow("start")
emit(1)
printFlow("emit 1")
emit(2)
printFlow("emit 2")
emit(3)
printFlow("emit 3")
.flowOn(Dispatchers.IO)
val scope = CoroutineScope(mySingleDispatcherForFLow)
flow.filter
printFlow("Filter:$it")
it > 1
.onEach
printFlow("onEach:$it")
.launchIn(scope)
delay(2000L)
Log
start;Thread:DefaultDispatcher-worker-1 @coroutine#3
emit 1;Thread:DefaultDispatcher-worker-1 @coroutine#3
emit 2;Thread:DefaultDispatcher-worker-1 @coroutine#3
emit 3;Thread:DefaultDispatcher-worker-1 @coroutine#3
Filter:1;Thread:MySingleThread @coroutine#2
Filter:2;Thread:MySingleThread @coroutine#2
onEach:2;Thread:MySingleThread @coroutine#2
Filter:3;Thread:MySingleThread @coroutine#2
onEach:3;Thread:MySingleThread @coroutine#2
Process finished with exit code 0
通过launchIn(scope)把上游代码分发到指定线程中。
public fun <T> Flow<T>.launchIn(scope: CoroutineScope): Job = scope.launch
collect() // tail-call
launchIn 从严格意义来讲,应该算是一个下游的终止操作符,因为它本质上是调用了 collect()。
上面的写法等价于下面的写法:
runBlocking
val flow = flow<Int>
printFlow("start")
emit(1)
printFlow("emit 1")
emit(2)
printFlow("emit 2")
emit(3)
printFlow("emit 3")
.flowOn(Dispatchers.IO)
val scope = CoroutineScope(mySingleDispatcherForFLow)
scope.launch
flow.filter
printFlow("Filter:$it")
it > 1
.collect
printFlow("collect:$it")
delay(2000L)
对于 Flow 当中的线程切换,可以使用 flowOn、launchIn、withContext,但其实,flowOn、launchIn 就已经可以满足需求了。另外,由于 Flow 当中直接使用 withContext 是很容易引发其他问题的,因此,withContext 在 Flow 当中是不被推荐的,谨慎要用。
四、下游,终止操作符
在 Flow 当中,终止操作符的意思就是终止整个 Flow 流程的操作符。collect 操作符之后,无法继续使用 map 之类的操作,因为 collect 是一个“终止”操作符,代表 Flow 数据流的终止。Flow 的终止操作符:比如 first()、single()、fold、reduce、toList()。
五、 为什么说Flow是“冷”的?
Channel 之所以被认为是“热”的原因,是因为不管有没有接收方,发送方都会工作。Flow 被认为是“冷”的就是因为只有调用终止操作符之后,Flow 才会开始工作。
runBlocking
val flow = flow<Int>
(1..5).forEach
println("Before flow emit:$it")
emit(it)
println("After flow emit:$it")
val channel = produce<Int>
(1..5).forEach
println("Before channel send:$it")
send(it)
println("After channel send:$it")
println("End!")
Log
End!
Before channel send:1
Flow还是懒的。这个懒表现在Flow 一次只会处理一条数据。
runBlocking
flow
println("emit:1")
emit(1)
println("emit:2")
emit(2)
println("emit:3")
emit(3)
.filter
println("filter:$it")
it > 0
.map
println("map:$it")
it * 2
.collect
println("collect:$it")
Log
emit:1
filter:1
map:1
collect:2
emit:2
filter:2
map:2
collect:4
emit:3
filter:3
map:3
collect:6
以上是关于Kotlin-Flow基础学习的主要内容,如果未能解决你的问题,请参考以下文章
关于Spring Cloud Gateway与下游服务器的连接分析
关于Spring Cloud Gateway与下游服务器的连接分析