深入kotlin - Flow 进阶
Posted 颐和园
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了深入kotlin - Flow 进阶相关的知识,希望对你有一定的参考价值。
Flow 上下文
Flow 的收集动作总是发生在调用协程的上下文当中,而非定义 Flow 的上下文。
fun log(msg: String) = println("[$Thread.currentThread().name], $msg")
fun myMethod(): Flow<Int> = flow
log("started")
for(i in 1..3)
emit(i)
fun main() = runBlocking
myMethod().collect log("collected: $it")
运行输出结果如下(打开 debug 参数):
[main @coroutine#1], started
[main @coroutine#1], collected: 1
[main @coroutine#1], collected: 2
[main @coroutine#1], collected: 3
可以看出 flow 运行于 collect 调用时的协程,即 runBlocking 开启的协程。这无疑会阻塞住主线程。因此我们可以将 flow 运行在其它上下文:
private fun log(msg:String) = println("[$Thread.currentThread().name], $msg")
private fun myMethod(): Flow<Int> = flow
withContext(Dispatchers.Default)
for(i in 1..4)
Thread.sleep(100)
emit(i)
fun main() = runBlocking
myMethod().collect log(it)
程序报出如下异常:
Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
Flow was collected in [BlockingCoroutineActive@405324edf, BlockingEventLoop@44235043],
but emission happened in [DispatchedCoroutineActive@40452e23e, DefaultDispatcher].
...
大意是收集线程发生在主线程,但 emit 线程发生在后台线程。withContext(Dispatchers.Default) 一句修改了 flow 的上下文,将 flow 的分发器修改到了 Dispatchers.Default。但 collect 的代码使用的分发器则是 runBlocking 的分发器。
flowOn 运算符
为了解决这个问题,kotlin 引入了 flowOn 方法:
private fun myMethod(): Flow<Int> = flow
for(i in 1..4)
Thread.sleep(100)
log("emit: $i")
emit(i)
.flowOn(Dispatchers.Default)
fun main() = runBlocking
myMethod().collect println("collect: $it")
运行程序,错误不再出现。flowOn 方法允许修改 中间操作 的 上下文,使得中间操作和终止操作运行于不同的上下文。flowOn 操作改变了 Flow 本身默认的顺序。现在收集操作和发射操作发生在不同的协程(线程):
[DefaultDispatcher-worker-1 @coroutine#2], emit: 1
[main @coroutine#1], collect: 1
[DefaultDispatcher-worker-1 @coroutine#2], emit: 2
[main @coroutine#1], collect: 2
[DefaultDispatcher-worker-1 @coroutine#2], emit: 3
[main @coroutine#1], collect: 3
[DefaultDispatcher-worker-1 @coroutine#2], emit: 4
[main @coroutine#1], collect: 4
缓冲
如果让 flow 的不同部分使用不同的协程执行,将有助于实现并行操作从而提升性能。这就会用到缓冲的概念。典型地,我们可以让 emit 操作和collect 操作并行。这样 emit 操作在collect 操作进行的同时不会被阻塞,而是继续 emit 下一个元素并将结果缓冲到缓存里,下一次收集将直接从缓存中读取。
privatet fun myMethod(): Flow<Int> = flow
for(i in 1..4)
delay(100) // 模拟耗时操作
emit(i)
fun main() = runBlocking
val time = measureTimeMillis
myMethod().collect
delay(200)
println(it)
println(time)
这里,发射一个元素需要0.1秒,收集一个元素耗时0.2秒,所以4个元素总共耗时 1.2 秒,所以输出结果如下:
1
2
3
4
1223
如果使用缓冲操作,则不需要耗时这么多时间了:
myMethod().buffer().collect
delay(200)
println(it)
唯一的修改就是在 collect 之前增加了一个buffer() 操作,这样元素发射之后不会等待收集完成,而是直接将结果缓冲到缓存里。真正搜集到的实际上是缓存里的元素。这样无疑会加快整个发射和搜集的过程:
1
2
3
4
1003
第一次收集发生在 0.3秒(0.1秒发射+0.2秒收集),第二次收集发生在 0.5 秒,第三次收集发生在 0.7 秒,第四次收集发生在0.9 秒。加上一些建立、销毁缓冲的时间,大致总耗时1秒左右。
缓冲和 flowOn 有一定关系。本质上 flowOn 操作需要改变 CoroutineDispatcher 时也会使用同样的缓冲机制,比如上面 flowOn 的例子。
Flow 的组合
将两个flow的内容合并为一个flow。
fun main()=runBlocking<Unit>
val nums = (1..5).asFlow()
val strings = flowOf("a","b","c","d","e")
// zip - 合并操作
nums.zip(strings)
a, b -> "$a$b"
.collect println(it)
zip 操作将第一个流和第二个流中的元素依序取出,然后按照第二个参数的lambda 进行处理,然后发射。输出结果如下所示:
1a
2b
3c
4d
5e
打平操作
将一个包含了 f low 的 flow (类似 Flow<Flow)转换成不包含 flow 的flow(Flow)操作(类似将二维数组转换成一维数组)。
private fun myMethod(i: Int): Flow<String> = flow
emit("$i: First")
delay(500)
emit("$i: Second")
fun main() = runBlocking<Unit>
val startTime = System.currentTimeMillis()
(1..3).asFlow().onEach // 1. 遍历元素
delay(100)
.flatMapConcat // 2. 打平
myMethod(it)
.collect // 3. 终止
println("$it: $System.currentTimeMillis()-startTime ms")
myMethod 方法
输出结果如下:
1: 144 ms
1: 646 ms
2: 751 ms
2: 1256 ms
3: 1360 ms
3: 1865 ms
-
将1…3 转换为流,在每个元素(1…3)上 delay 0.1 秒。
-
将 3 个元素(1…3)转换为 3 个流,myMethod 方法每次都会产生两个相同元素的流(emit 两次)。这样 1…3 就会变成:
[1,1],[2,2],[3,3]
而且由于每个流之间有一个 delay 0.1秒,所以打印输出时,两个相同数字之间间隔0.5秒, 而两两一组之间间隔 0.1 秒。
-
此外 flatMapConcat 会将它从二维的流打平成一维的流,于是这个流在收集时会变成 6 个单独的元素:
[1,1,2,2,3,3]
Flow 的异常
可以对 flow的异常进行传统的 try…catch 处理:
private fun myMethod():Flow<Int> = flow
for(i in 1..3)
println("emit: $i")
emit(i)
fun main() = runBlocking<Unit>
try
myMethod().collect
println(it)
check(it <= 1)// check 函数的作用是检查第一个参数,当true 时执行第二个参数(lambda),否则抛出一个 IllegalStateException,同时 error message 的内容时lambda 表达式返回的 Any 类型。
"collect $it"
catch(e: Throwable)
println("Caught $e")
执行上述代码,输出:
emit: 1
1
emit: 2
2
Caught java.lang.IllegalStateException: collect 2
当第二个元素为 2 时,check 判断失败,抛出异常,被 try…catch 所捕获。
在上面的例子中,异常发生在收集阶段。但 try…catch 也可以捕获flow 的发射阶段和中间操作阶段。
private fun myMethod():Flow<String> = flow
for(i in 1..3)
println("emit: $i")
emit(i)
.map value ->
check(value <= 1) "crash on $value"
"string $value"
fun main()=runBlocking<Unit>
try
myMethod().collect println(it)
catch(e: Throwable)
println("caught $e")
运行程序,输出:
emit: 1
string 1
emit: 2
caught java.lang.IllegalStateException: crash on 2
当元素 大于 1 时,异常被捕获,并且 crash on 2 被保存在了异常的 message 当中。
完成
当 flow 执行完毕之后,可以额外追加一个完成动作。这个动作可以是命令式的,也可以时声明式的。
private fun myMethod(): Flow<Int> = (1..10).asFlow()
fun main()=runBlocking<Unit>
try
myMethod().collect println(it)
finally
println("finally")
finally 块中的代码最终都会得到执行,无论 flow 是正常执行完还是抛出异常,这就是命令式,利用了 try…finally 语句:
1
2
...
10
finally
声明式则比较灵活,它就是 onCompletion 中间操作,它在 collect 或 取消 之后执行:
private fun myMethod(): Flow<Int> = (1..10).asFlow()
fun main() = runBlocking<Unit>
myMethod().onCompletion println("onCompletion")
.collect println(it)
输出结果同之前一模一样。值得注意的是,onCompletion 的 action 参数是一个带参数的 lambda 表达式,这个参数的类型是可空的 Throwable。可以利用这个参数获取抛出的异常。
private fun myMethod(): Flow<Int> = flow
emit(1)
throw RuntimeException() // 抛出异常
fun main() = runBlocking
myMethod().onCompletion e -> if(e!=null) println("Flow stopped with an Exception.")
.catch e -> println("Caught an Exception")
.collect println(it)
其中,catch 操作也是一个中间操作,用于捕获 flow 中的异常。
打印结果:
1
Flow stopped with an Exception.
Caught an Exception
但是有一点需要注意,onCompletion 只会看到来自于 flow 上游的异常,但无法捕获下游异常。
private fun myMethod(): Flow<Int> = (1..10).asFlow()
fun main() = runBlocking<Unit>
myMethod().onCompletion e -> println("Flow stopped with an Exception $e.")
.collect value ->
checkvalue <= 1 "collect $value"
println(value)
输出结果如下:
1
Flow stopped with an Exception null.
Exception in thread "main" java.lang.IllegalStateException: Collect 2
...
这里,onCompletion 把 e 打印成 null,说明它并没有捕获到这个异常,因为 onCompletion 只是一个中间操作,而 collect 是终止操作,从这个角度上看 collect 是 onCompletion 的下游操作,因此 collect 中出现的异常无法被 onCompletion 捕获。
取消
flow 的取消实际上是通过协程的取消来实现的,本身没有所谓的取消操作。比如我们取消一个 cellect 操作,前提是 flow 本身在一个可取消的挂起函数(如 delay) 中被挂起了:
private fun myMethod(): Flow<Int> = flow
for (i int 1..4)
delay(100)
println("Emit: $i")
emit(i)
fun main() = runBlocking<Unit>
withTimeoutOrNull(280) // 设置超时时间 280 毫秒
myMethod().collect println(it)
println("Done.")
控制台输出如下:
Emit: 1
1
Emit: 2
2
Done.
第3、4次循环因超时被取消。这里,withTimeoutOrNull 就是一个可取消的挂起函数。
以上是关于深入kotlin - Flow 进阶的主要内容,如果未能解决你的问题,请参考以下文章