深入kotlin - Flow 进阶

Posted 颐和园

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了深入kotlin - Flow 进阶相关的知识,希望对你有一定的参考价值。

Flow 上下文

Flow 的收集动作总是发生在调用协程的上下文当中,而非定义 Flow 的上下文。

fun log(msg: String) = println("[$Thread.currentThread().name], $msg")
fun myMethod(): Flow<Int> = flow 
  log("started")
  for(i in 1..3)
    emit(i)
  

fun main() = runBlocking 
	myMethod().collect  log("collected: $it")

运行输出结果如下(打开 debug 参数):

[main @coroutine#1], started
[main @coroutine#1], collected: 1
[main @coroutine#1], collected: 2
[main @coroutine#1], collected: 3

可以看出 flow 运行于 collect 调用时的协程,即 runBlocking 开启的协程。这无疑会阻塞住主线程。因此我们可以将 flow 运行在其它上下文:

private fun log(msg:String) = println("[$Thread.currentThread().name], $msg")
private fun myMethod(): Flow<Int> = flow 
	withContext(Dispatchers.Default) 
		for(i in 1..4)
			Thread.sleep(100)
			emit(i)
		
	

fun main() = runBlocking 
	myMethod().collect  log(it) 

程序报出如下异常:

Exception in thread "main" java.lang.IllegalStateException: Flow invariant is violated:
Flow was collected in [BlockingCoroutineActive@405324edf, BlockingEventLoop@44235043],
but emission happened in [DispatchedCoroutineActive@40452e23e, DefaultDispatcher].
...

大意是收集线程发生在主线程,但 emit 线程发生在后台线程。withContext(Dispatchers.Default) 一句修改了 flow 的上下文,将 flow 的分发器修改到了 Dispatchers.Default。但 collect 的代码使用的分发器则是 runBlocking 的分发器。

flowOn 运算符

为了解决这个问题,kotlin 引入了 flowOn 方法:

private fun myMethod(): Flow<Int> = flow 
	for(i in 1..4)
		Thread.sleep(100)
    log("emit: $i")
		emit(i)
	.flowOn(Dispatchers.Default)

fun main() = runBlocking 
	myMethod().collect  println("collect: $it") 

运行程序,错误不再出现。flowOn 方法允许修改 中间操作 的 上下文,使得中间操作和终止操作运行于不同的上下文。flowOn 操作改变了 Flow 本身默认的顺序。现在收集操作和发射操作发生在不同的协程(线程):

[DefaultDispatcher-worker-1 @coroutine#2], emit: 1
[main @coroutine#1], collect: 1
[DefaultDispatcher-worker-1 @coroutine#2], emit: 2
[main @coroutine#1], collect: 2
[DefaultDispatcher-worker-1 @coroutine#2], emit: 3
[main @coroutine#1], collect: 3
[DefaultDispatcher-worker-1 @coroutine#2], emit: 4
[main @coroutine#1], collect: 4
缓冲

如果让 flow 的不同部分使用不同的协程执行,将有助于实现并行操作从而提升性能。这就会用到缓冲的概念。典型地,我们可以让 emit 操作和collect 操作并行。这样 emit 操作在collect 操作进行的同时不会被阻塞,而是继续 emit 下一个元素并将结果缓冲到缓存里,下一次收集将直接从缓存中读取。

privatet fun myMethod(): Flow<Int> = flow 
	for(i in 1..4)
		delay(100) // 模拟耗时操作
		emit(i)
	

fun main() = runBlocking 
  val time = measureTimeMillis 
    myMethod().collect 
      delay(200)
      println(it)
    
  
  println(time)

这里,发射一个元素需要0.1秒,收集一个元素耗时0.2秒,所以4个元素总共耗时 1.2 秒,所以输出结果如下:

1
2
3
4
1223

如果使用缓冲操作,则不需要耗时这么多时间了:

myMethod().buffer().collect 
      delay(200)
      println(it)
    

唯一的修改就是在 collect 之前增加了一个buffer() 操作,这样元素发射之后不会等待收集完成,而是直接将结果缓冲到缓存里。真正搜集到的实际上是缓存里的元素。这样无疑会加快整个发射和搜集的过程:

1
2
3
4
1003

第一次收集发生在 0.3秒(0.1秒发射+0.2秒收集),第二次收集发生在 0.5 秒,第三次收集发生在 0.7 秒,第四次收集发生在0.9 秒。加上一些建立、销毁缓冲的时间,大致总耗时1秒左右。

缓冲和 flowOn 有一定关系。本质上 flowOn 操作需要改变 CoroutineDispatcher 时也会使用同样的缓冲机制,比如上面 flowOn 的例子。

Flow 的组合

将两个flow的内容合并为一个flow。

fun main()=runBlocking<Unit> 
	val nums = (1..5).asFlow()
  val strings = flowOf("a","b","c","d","e")
  // zip - 合并操作
  nums.zip(strings)
    a, b -> "$a$b"    
  .collect println(it) 

zip 操作将第一个流和第二个流中的元素依序取出,然后按照第二个参数的lambda 进行处理,然后发射。输出结果如下所示:

1a
2b
3c
4d
5e
打平操作

将一个包含了 f low 的 flow (类似 Flow<Flow)转换成不包含 flow 的flow(Flow)操作(类似将二维数组转换成一维数组)。

private fun myMethod(i: Int): Flow<String> = flow 
	emit("$i: First")
	delay(500)
	emit("$i: Second")

fun main() = runBlocking<Unit> 
  val startTime = System.currentTimeMillis()
  (1..3).asFlow().onEach // 1. 遍历元素
  	delay(100)
  .flatMapConcat // 2. 打平
    myMethod(it)
  .collect  // 3. 终止
    println("$it: $System.currentTimeMillis()-startTime ms")
  

myMethod 方法

输出结果如下:

1: 144 ms
1: 646 ms
2: 751 ms
2: 1256 ms
3: 1360 ms
3: 1865 ms
  1. 将1…3 转换为流,在每个元素(1…3)上 delay 0.1 秒。

  2. 将 3 个元素(1…3)转换为 3 个流,myMethod 方法每次都会产生两个相同元素的流(emit 两次)。这样 1…3 就会变成:

    [1,1],[2,2],[3,3]

    而且由于每个流之间有一个 delay 0.1秒,所以打印输出时,两个相同数字之间间隔0.5秒, 而两两一组之间间隔 0.1 秒。

  3. 此外 flatMapConcat 会将它从二维的流打平成一维的流,于是这个流在收集时会变成 6 个单独的元素:

    [1,1,2,2,3,3]

Flow 的异常

可以对 flow的异常进行传统的 try…catch 处理:

private fun myMethod():Flow<Int> = flow 
  for(i in 1..3) 
    println("emit: $i")
    emit(i)
  

fun main() = runBlocking<Unit> 
  try 
    myMethod().collect 
      println(it)
      check(it <= 1)// check 函数的作用是检查第一个参数,当true 时执行第二个参数(lambda),否则抛出一个 IllegalStateException,同时 error message 的内容时lambda 表达式返回的 Any 类型。
        "collect $it"
      
    
  catch(e: Throwable) 
    println("Caught $e")
  

执行上述代码,输出:

emit: 1
1
emit: 2
2
Caught java.lang.IllegalStateException: collect  2

当第二个元素为 2 时,check 判断失败,抛出异常,被 try…catch 所捕获。

在上面的例子中,异常发生在收集阶段。但 try…catch 也可以捕获flow 的发射阶段和中间操作阶段。

private fun myMethod():Flow<String> = flow 
	for(i in 1..3) 
    println("emit: $i")
    emit(i)
  .map value ->
  	check(value <= 1)  "crash on $value" 
    "string $value"
  

fun main()=runBlocking<Unit> 
  try
    myMethod().collect println(it) 
  catch(e: Throwable)
    println("caught $e")
  

运行程序,输出:

emit: 1
string 1
emit: 2
caught java.lang.IllegalStateException: crash on 2

当元素 大于 1 时,异常被捕获,并且 crash on 2 被保存在了异常的 message 当中。

完成

当 flow 执行完毕之后,可以额外追加一个完成动作。这个动作可以是命令式的,也可以时声明式的。

private fun myMethod(): Flow<Int> = (1..10).asFlow()

fun main()=runBlocking<Unit> 
	try 
		myMethod().collect  println(it) 
	finally 
		println("finally")
	

finally 块中的代码最终都会得到执行,无论 flow 是正常执行完还是抛出异常,这就是命令式,利用了 try…finally 语句:

1
2
...
10
finally

声明式则比较灵活,它就是 onCompletion 中间操作,它在 collect 或 取消 之后执行:

private fun myMethod(): Flow<Int> = (1..10).asFlow()
fun main() = runBlocking<Unit> 
	myMethod().onCompletion println("onCompletion") 
  .collect  println(it) 

输出结果同之前一模一样。值得注意的是,onCompletion 的 action 参数是一个带参数的 lambda 表达式,这个参数的类型是可空的 Throwable。可以利用这个参数获取抛出的异常。

private fun myMethod(): Flow<Int> = flow 
	emit(1)
	throw RuntimeException() // 抛出异常

fun main() = runBlocking 
  myMethod().onCompletion  e -> if(e!=null) println("Flow stopped with an Exception.")
  .catch  e -> println("Caught an Exception") 
  .collect  println(it) 

其中,catch 操作也是一个中间操作,用于捕获 flow 中的异常。

打印结果:

1
Flow stopped with an Exception.
Caught an Exception

但是有一点需要注意,onCompletion 只会看到来自于 flow 上游的异常,但无法捕获下游异常。

private fun myMethod(): Flow<Int> = (1..10).asFlow()
fun main() = runBlocking<Unit> 
	myMethod().onCompletion e -> println("Flow stopped with an Exception $e.")
  .collect  value -> 
     checkvalue <= 1  "collect $value" 
			println(value)
	

输出结果如下:

1
Flow stopped with an Exception null.
Exception in thread "main" java.lang.IllegalStateException: Collect 2
...

这里,onCompletion 把 e 打印成 null,说明它并没有捕获到这个异常,因为 onCompletion 只是一个中间操作,而 collect 是终止操作,从这个角度上看 collect 是 onCompletion 的下游操作,因此 collect 中出现的异常无法被 onCompletion 捕获。

取消

flow 的取消实际上是通过协程的取消来实现的,本身没有所谓的取消操作。比如我们取消一个 cellect 操作,前提是 flow 本身在一个可取消的挂起函数(如 delay) 中被挂起了:

private fun myMethod(): Flow<Int> = flow 
  for (i int 1..4) 
    delay(100)
    println("Emit: $i")
    emit(i)
  

fun main() = runBlocking<Unit> 
  withTimeoutOrNull(280) // 设置超时时间 280 毫秒
  	myMethod().collect  println(it) 
  
  println("Done.")

控制台输出如下:

Emit: 1
1
Emit: 2
2
Done.

第3、4次循环因超时被取消。这里,withTimeoutOrNull 就是一个可取消的挂起函数。

以上是关于深入kotlin - Flow 进阶的主要内容,如果未能解决你的问题,请参考以下文章

深入kotlin - Flow 进阶

深入kotlin - Flow 进阶

Kotlin Flow响应式编程,操作符函数进阶

Kotlin Flow响应式编程,操作符函数进阶

Kotlin Flow响应式编程,操作符函数进阶

深入kotlin - 初识 Flow