深入kotlin - 初识 Flow

Posted 颐和园

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了深入kotlin - 初识 Flow相关的知识,希望对你有一定的参考价值。

asContextElement

ThreadLocal 是 Java 中线程间数据访问冲突中的一种解决方式,常用于取代锁。其会将线程间共享的数据复制为多份(每个线程的 ThreadLocal 数据维护在一张 map 表中,其中的 key 就是每个线程对象。这张 map 中,key 是 ThreadLocal 自己,value 是复制后的数据)。这样,每个线程操作一份数据,从而解决访问冲突。

但在协程环境中,这个问题变得复杂。因为协程并没有和线程绑定,一个协程在执行过程中是可以切换线程的(例如之前 Dispatchers.Unconfined 的例子)。

Kotlin 通过一系列的扩张方法解决这个问题。当协程从线程 A 切换到 B 然后又回到 A,线程 A 的 ThreadLocal 属性自动恢复。这就是 asContextElement(value:) 方法,它是 ThreadLocal 的扩展方法,用于将一个 ThreadLocal 包装成 ThreadContextElement。ThreadContextElement 会将 ThreadLocal 值复制到协程中,但不与特定线程绑定,value 参数会覆盖 ThreadLocal 中的 value。

来看这个例子。

val threadLocal = ThreadLocal<String>()
fun main() = runBlocking<Unit> 
  threadLocal.set("Jim")
  println("$Thread.currentThread(): $threadLocal.get()") // 打印:Thread[main @coroutine#1,5,main]: Jim
  
  val job = launch(Dispatchers.Default+threadLocal.asContextElement(value="Ann"))// ThreadLocal -> ThreadContextElement,同时覆盖 threadLocal 的 value,因此,threadLocal 的值会从 Jim 变成 Ann。
    println("$Thread.currentThread(): $threadLocal.get()") // 打印:Thread[DefaultDispatcher-worker-1 @coroutine#2,5,main]: Ann
    yield() // 将协程切换到(线程池中的)其它线程执行
		println("$Thread.currentThread(): $threadLocal.get()") // 打印:Thread[DefaultDispatcher-worker-2 @coroutine#2,5,main]: Ann
  
  job.join() // 回到主协程,threadLocal 值自动恢复
  println("$Thread.currentThread(), $threadLocal.get()") // 打印:Thread[main @coroutine#1,5,main]: Jim

程序的整个输出如下(打开 coroutines.debug 开关):

Thread[main @coroutine#1,5,main]: Jim
Thread[DefaultDispatcher-worker-1 @coroutine#2,5,main]: Ann
Thread[DefaultDispatcher-worker-2 @coroutine#2,5,main]: Ann
Thread[main @coroutine#1,5,main]: Jim

ThreadContextElement 不会跟踪 thread local 的值,它只是原来值的拷贝,所以对它的任何修改都不会影响原来 thread local 的值。

如果你在协程中直接使用了 thread local(没有使用 asContextElement 方法),并修改了 thread local 的值,则 thread local 的值可能会变得不确定,如:

val t1 = ThreadLocal.withInitial "initial" 
runBlocking  // 注意,这里没有用 asContextElement(value:) 方法
	println(t1.get()) // 打印:initial
	withContext(t1.asContextElement("modified"))  
		println(t1.get()) // 打印:modified
	
	// 返回原来的上下文
	println(t1.get())// 可能打印:initial,也可能打印:modified,不确定

Flow

Flow 表示一个异步流,类似于 Java stream。如果一个函数需要返回多个值,除了使用集合外,还可以返回 Flow:

// 方法一、使用集合
private fun myMethod(): List<String> = listOf("How are you doing", "Not bad","Thank you", "How about you")
fun main() 
  myMethod().forEach 
    println(it)
  

// 方法二、使用 Sequence
private fun myMethod1(): Sequence<Int> = sequence 
  for(i in 100..105)
    Thread.sleep(1000) // 阻塞主线程
    yield(i) // 返回一个元素
  
 
fun main() 
  myMethod1().forEach println(it)  // 打印 100 ~ 105

// 方法三、使用协程
private suspend fun myMethod2():List<String>
  delay(1000)
  return listOf("How are you doing", "Not bad","Thank you", "How about you")

fun main()=runBlocking<Unit> 
  myMethod2().forEach println(it)  // 每一秒打印一个字符串

// 方法四、使用 Flow
fun myMethod3():Flow<Int> = flow // 调用 flow 构建器,自动变成挂起函数(无需显式使用 suspend 关键字),因而可以调用其他挂起函数
  for (i in 1..4)
    delay(1000)
    emit(i) // 类似 yield,异步返回一个结果
  

fun main() = runBlocking<Unit> 
  launch
    for (i in 1..2) 
      println("group $i ----")
      delay(2000)
    
  
  myMethod3().collect  println(it)  // collect 用于接收 emit 返回的一个结果,二者是配对的

第一种方法的特点是:

  • myMethod 方法是阻塞的
  • 集合中所有值只能全部求解后一次性返回,不分先后

第二种方法的特点是:

  • Thread.sleep 模拟阻塞方式的求解过程

  • sequence 中每 yield 一次,就返回给调用者一次,所以5条打印语句并不是一次性打印出来的,而是每隔一秒打印出一个数字。

第三种方法的特点是:

  • delay 是异步的,模拟异步求解过程,它不会阻塞主线程
  • myMethod2 是挂起函数,所以只能在另外一个挂起函数或协程中调用,因此 main() 函数使用了 runBlocking
  • 但是仍然是一次性返回所有结果

第四种方法的特点是:

  • 求解过程是异步的,不会阻塞线程

  • 返回过程是异步的,每 emit 一次就返回一个,而不是全部一次性返回

    因此打印结果会先打印 group 1 ----,然后是数字 1 和 2(间隔1秒),2 秒后是group 2 ----,然后是 3 和 4(间隔1秒):

    group 1 ----
    1
    2
    group 2 ----
    3
    4
    

可以看出 Flow 非常类似于 Sequence,但是它是异步执行的,而 Sequence 是同步执行的。

此外,如果将 Flow 中的 delay 换成 Thread.sleep,则 Flow 的 emit 失去作用,异步返回变成一次性返回:

1
2
3
4
group 1 ----
group 3 ----

很显然,Thread.sleep 阻塞了主线程的执行。

Flow 构建器

Flow 通过构建器进行构建,它有 4 种构建器:

  • flowOf(…)
  • asFlow(…)
  • flow…
  • channelFlow …

之前我们已经使用的就是 flow… 构建器 。它的使用较为简单。接下来我们看一下另外3个构建器。

  • flowOf

    定义一个发射固定数量值的流。它接收一个可变参数的值,并对这个可变参数进行循环 emit。

fun main() = runBloking 
	flowOf(10,20,29,30).collect println(it) 

  • asFlow

    集合和序列都提供了 asFlow 扩展方法,可以将自身转换为一个 flow 流。

fun main() = runBlocking 
	(1..10).asFlow().collect println(it) 

中间操作和终止操作

中间操作不会导致 Flow 中的代码被执行,比如 emit 就是一种中间操作,它不会导致代码真正被执行。而终止操作才会执行代码,collect 就是一种终止操作。

private fun myMethod():Flow<Int> = flow
	println("I'm fine.")
	for(i in 1..3) 
		delay(1000)
		emit(i)
	

fun main() = runBlocking<Unit> 
  println("Let it go!")
  val flow = myMethod() // 调用方法并不会导致代码被执行
  println("See you.")

打印结果是“I’m fine."一句并不会被打印:

Let it go!
See you.

要想让 flow 真正执行,需要加上:

flow.collect println(it) 

注意,如果多次调用终止操作,将会导致 flow 多次执行。

Flow 的中间操作中可以调用挂起函数的,这与 Sequence 是不同的。

private suspend fun myExecution(input:Int): String
	delay(1000)
	return "output:$input"

fun main() = runBlocking 
  (1..10).asFlow()
  .filter it -> it > 5 
  .map input -> myExecution(input) // 调用挂起函数 myExecution
  .collect println(it) 

输出结果如下:

output:6
output:7
...
output:10

除了 filter 和 map,Flow 还支持 transform 操作:

private suspend fun myExecution(input:Int): String
	delay(1000)
	return "output: $input"

fun main() = runBlocking 
	(1..10).asFlow().transform input -> 
                             emit( "transform: $input")
                             emit(myExecution(input))
                             emit("------")
                            .collect println(it) 

输出结果如下:

transform: 1
output: 1
------
transform: 2
output: 2
------
...
transform: 10
output: 10
------

transform 中可以执行任何逻辑,不需要返回任何值,如果需要向 flow 发送值,可以使用 emit (可以发射多次)。它比 filter 或 map 更自由和强大。

中间操作中可以限定元素数量:

fun myNumbers():Flow<Int> = flow
  try 
		emit(1)
 	 emit(2)
  	println("----")
 	 emit(3)
  catch(e: Exception) 
		println(e)
  finally
    println("--finally")
  
	

fun main() = runBlocking<Uint> 
	myNumbers().take(2).collect( println(it) )// take(2) 仅获取前 2 个元素

输出结果如下:

1
2
kotlinx.coroutines.flow.interal.AbortFlowException: Flow was aborted, no more elements needed
finally

可以看到当 take 到指定元素后,flow 会直接抛出一个异常,从而导致流被取消。

所有的终止操作都是挂起函数。终止操作才会真正执行流的代码。除了 collect 操作,flow 还有其他终止操作 ,比如 toList,toSet,reduce 等。

fun main() = runBlocking 
	val result = (1..4).asFlow().map( it*it )
	.reduce( a,b -> a+b ) // 汇聚操作,将元素值进行累加
	println(result)

输出结果为 30,因为 1+4+9+16 = 30。

Flow 是顺序执行的。collect 操作运行在终止操作的协程,默认不会开启新的协程。每个 emit 的元素都会由所有的中间操作进行处理,最后由终止操作处理。

fun main() = runBlocking 
	(1..10).asFlow().filter // 只允许偶数元素通过,奇数被过滤
    it % 2 == 0
  .map // 偶数元素进入 map 操作
    println("map: $it")
    it
  .collect 
    println("collect: $it")
  

输出结果如下:

filter: 2
map: 2
collect: 2
filter: 4
map: 4
collect: 4
...
filter: 10
map: 10
collect: 10

以上是关于深入kotlin - 初识 Flow的主要内容,如果未能解决你的问题,请参考以下文章

深入kotlin - 初识 Flow

深入kotlin - 初识 Flow

深入理解Kotlin协程Google的工程师们是这样理解Flow的?

深入kotlin - Flow 进阶

深入kotlin - Flow 进阶

深入kotlin - Flow 进阶