深入kotlin - 初识 Flow
Posted 颐和园
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了深入kotlin - 初识 Flow相关的知识,希望对你有一定的参考价值。
asContextElement
ThreadLocal 是 Java 中线程间数据访问冲突中的一种解决方式,常用于取代锁。其会将线程间共享的数据复制为多份(每个线程的 ThreadLocal 数据维护在一张 map 表中,其中的 key 就是每个线程对象。这张 map 中,key 是 ThreadLocal 自己,value 是复制后的数据)。这样,每个线程操作一份数据,从而解决访问冲突。
但在协程环境中,这个问题变得复杂。因为协程并没有和线程绑定,一个协程在执行过程中是可以切换线程的(例如之前 Dispatchers.Unconfined 的例子)。
Kotlin 通过一系列的扩张方法解决这个问题。当协程从线程 A 切换到 B 然后又回到 A,线程 A 的 ThreadLocal 属性自动恢复。这就是 asContextElement(value:) 方法,它是 ThreadLocal 的扩展方法,用于将一个 ThreadLocal 包装成 ThreadContextElement。ThreadContextElement 会将 ThreadLocal 值复制到协程中,但不与特定线程绑定,value 参数会覆盖 ThreadLocal 中的 value。
来看这个例子。
val threadLocal = ThreadLocal<String>()
fun main() = runBlocking<Unit>
threadLocal.set("Jim")
println("$Thread.currentThread(): $threadLocal.get()") // 打印:Thread[main @coroutine#1,5,main]: Jim
val job = launch(Dispatchers.Default+threadLocal.asContextElement(value="Ann"))// ThreadLocal -> ThreadContextElement,同时覆盖 threadLocal 的 value,因此,threadLocal 的值会从 Jim 变成 Ann。
println("$Thread.currentThread(): $threadLocal.get()") // 打印:Thread[DefaultDispatcher-worker-1 @coroutine#2,5,main]: Ann
yield() // 将协程切换到(线程池中的)其它线程执行
println("$Thread.currentThread(): $threadLocal.get()") // 打印:Thread[DefaultDispatcher-worker-2 @coroutine#2,5,main]: Ann
job.join() // 回到主协程,threadLocal 值自动恢复
println("$Thread.currentThread(), $threadLocal.get()") // 打印:Thread[main @coroutine#1,5,main]: Jim
程序的整个输出如下(打开 coroutines.debug 开关):
Thread[main @coroutine#1,5,main]: Jim
Thread[DefaultDispatcher-worker-1 @coroutine#2,5,main]: Ann
Thread[DefaultDispatcher-worker-2 @coroutine#2,5,main]: Ann
Thread[main @coroutine#1,5,main]: Jim
ThreadContextElement 不会跟踪 thread local 的值,它只是原来值的拷贝,所以对它的任何修改都不会影响原来 thread local 的值。
如果你在协程中直接使用了 thread local(没有使用 asContextElement 方法),并修改了 thread local 的值,则 thread local 的值可能会变得不确定,如:
val t1 = ThreadLocal.withInitial "initial" runBlocking // 注意,这里没有用 asContextElement(value:) 方法 println(t1.get()) // 打印:initial withContext(t1.asContextElement("modified")) println(t1.get()) // 打印:modified // 返回原来的上下文 println(t1.get())// 可能打印:initial,也可能打印:modified,不确定
Flow
Flow 表示一个异步流,类似于 Java stream。如果一个函数需要返回多个值,除了使用集合外,还可以返回 Flow:
// 方法一、使用集合
private fun myMethod(): List<String> = listOf("How are you doing", "Not bad","Thank you", "How about you")
fun main()
myMethod().forEach
println(it)
// 方法二、使用 Sequence
private fun myMethod1(): Sequence<Int> = sequence
for(i in 100..105)
Thread.sleep(1000) // 阻塞主线程
yield(i) // 返回一个元素
fun main()
myMethod1().forEach println(it) // 打印 100 ~ 105
// 方法三、使用协程
private suspend fun myMethod2():List<String>
delay(1000)
return listOf("How are you doing", "Not bad","Thank you", "How about you")
fun main()=runBlocking<Unit>
myMethod2().forEach println(it) // 每一秒打印一个字符串
// 方法四、使用 Flow
fun myMethod3():Flow<Int> = flow // 调用 flow 构建器,自动变成挂起函数(无需显式使用 suspend 关键字),因而可以调用其他挂起函数
for (i in 1..4)
delay(1000)
emit(i) // 类似 yield,异步返回一个结果
fun main() = runBlocking<Unit>
launch
for (i in 1..2)
println("group $i ----")
delay(2000)
myMethod3().collect println(it) // collect 用于接收 emit 返回的一个结果,二者是配对的
第一种方法的特点是:
- myMethod 方法是阻塞的
- 集合中所有值只能全部求解后一次性返回,不分先后
第二种方法的特点是:
-
Thread.sleep 模拟阻塞方式的求解过程
-
sequence 中每 yield 一次,就返回给调用者一次,所以5条打印语句并不是一次性打印出来的,而是每隔一秒打印出一个数字。
第三种方法的特点是:
- delay 是异步的,模拟异步求解过程,它不会阻塞主线程
- myMethod2 是挂起函数,所以只能在另外一个挂起函数或协程中调用,因此 main() 函数使用了 runBlocking
- 但是仍然是一次性返回所有结果
第四种方法的特点是:
-
求解过程是异步的,不会阻塞线程
-
返回过程是异步的,每 emit 一次就返回一个,而不是全部一次性返回
因此打印结果会先打印
group 1 ----
,然后是数字 1 和 2(间隔1秒),2 秒后是group 2 ----
,然后是 3 和 4(间隔1秒):group 1 ---- 1 2 group 2 ---- 3 4
可以看出 Flow 非常类似于 Sequence,但是它是异步执行的,而 Sequence 是同步执行的。
此外,如果将 Flow 中的 delay 换成 Thread.sleep,则 Flow 的 emit 失去作用,异步返回变成一次性返回:
1
2
3
4
group 1 ----
group 3 ----
很显然,Thread.sleep 阻塞了主线程的执行。
Flow 构建器
Flow 通过构建器进行构建,它有 4 种构建器:
- flowOf(…)
- asFlow(…)
- flow…
- channelFlow …
之前我们已经使用的就是 flow… 构建器 。它的使用较为简单。接下来我们看一下另外3个构建器。
-
flowOf
定义一个发射固定数量值的流。它接收一个可变参数的值,并对这个可变参数进行循环 emit。
fun main() = runBloking
flowOf(10,20,29,30).collect println(it)
-
asFlow
集合和序列都提供了 asFlow 扩展方法,可以将自身转换为一个 flow 流。
fun main() = runBlocking
(1..10).asFlow().collect println(it)
中间操作和终止操作
中间操作不会导致 Flow 中的代码被执行,比如 emit 就是一种中间操作,它不会导致代码真正被执行。而终止操作才会执行代码,collect 就是一种终止操作。
private fun myMethod():Flow<Int> = flow
println("I'm fine.")
for(i in 1..3)
delay(1000)
emit(i)
fun main() = runBlocking<Unit>
println("Let it go!")
val flow = myMethod() // 调用方法并不会导致代码被执行
println("See you.")
打印结果是“I’m fine."一句并不会被打印:
Let it go!
See you.
要想让 flow 真正执行,需要加上:
flow.collect println(it)
注意,如果多次调用终止操作,将会导致 flow 多次执行。
Flow 的中间操作中可以调用挂起函数的,这与 Sequence 是不同的。
private suspend fun myExecution(input:Int): String
delay(1000)
return "output:$input"
fun main() = runBlocking
(1..10).asFlow()
.filter it -> it > 5
.map input -> myExecution(input) // 调用挂起函数 myExecution
.collect println(it)
输出结果如下:
output:6
output:7
...
output:10
除了 filter 和 map,Flow 还支持 transform 操作:
private suspend fun myExecution(input:Int): String
delay(1000)
return "output: $input"
fun main() = runBlocking
(1..10).asFlow().transform input ->
emit( "transform: $input")
emit(myExecution(input))
emit("------")
.collect println(it)
输出结果如下:
transform: 1
output: 1
------
transform: 2
output: 2
------
...
transform: 10
output: 10
------
transform 中可以执行任何逻辑,不需要返回任何值,如果需要向 flow 发送值,可以使用 emit (可以发射多次)。它比 filter 或 map 更自由和强大。
中间操作中可以限定元素数量:
fun myNumbers():Flow<Int> = flow
try
emit(1)
emit(2)
println("----")
emit(3)
catch(e: Exception)
println(e)
finally
println("--finally")
fun main() = runBlocking<Uint>
myNumbers().take(2).collect( println(it) )// take(2) 仅获取前 2 个元素
输出结果如下:
1
2
kotlinx.coroutines.flow.interal.AbortFlowException: Flow was aborted, no more elements needed
finally
可以看到当 take 到指定元素后,flow 会直接抛出一个异常,从而导致流被取消。
所有的终止操作都是挂起函数。终止操作才会真正执行流的代码。除了 collect 操作,flow 还有其他终止操作 ,比如 toList,toSet,reduce 等。
fun main() = runBlocking
val result = (1..4).asFlow().map( it*it )
.reduce( a,b -> a+b ) // 汇聚操作,将元素值进行累加
println(result)
输出结果为 30,因为 1+4+9+16 = 30。
Flow 是顺序执行的。collect 操作运行在终止操作的协程,默认不会开启新的协程。每个 emit 的元素都会由所有的中间操作进行处理,最后由终止操作处理。
fun main() = runBlocking
(1..10).asFlow().filter // 只允许偶数元素通过,奇数被过滤
it % 2 == 0
.map // 偶数元素进入 map 操作
println("map: $it")
it
.collect
println("collect: $it")
输出结果如下:
filter: 2
map: 2
collect: 2
filter: 4
map: 4
collect: 4
...
filter: 10
map: 10
collect: 10
以上是关于深入kotlin - 初识 Flow的主要内容,如果未能解决你的问题,请参考以下文章