对比Java学Kotlin协程-异步流
Posted 陈蒙_
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了对比Java学Kotlin协程-异步流相关的知识,希望对你有一定的参考价值。
文章目录
异步流定义
我们知道有 RxJava、RxSwift、RxJs,那有 RxKotlin 吗?答案是没有。因为不需要,Kotlin 内置了 「RxKotlin」,天生拥有响应式编程能力,这就是 Kotlin 异步流。
类比 RxJava = 响应式编程+线程,有 Kotlin 异步流 = RxKotlin + 协程。
RxJava 支持切换线程,而Kotlin异步流切换的是协程。
既然是“异步”流,那有“同步”流吗?有的,Kotlin 中的同步流就是 Sequence。
跟 RxJava、Project Reactor 一样,Kotlin Flow 也是响应式编程规范(Reactive Streams Specification)的一种实现。
通过 launch()、async() 这些创建协程的方法可以获取单个异步执行的实例。那如果我们想同时获取多个呢?或者说获取一组异步执行的实例呢?这个时候异步流就派上用场了。
响应式编程与异步流
什么是响应式编程?
抛开具体编程语言,我们看一个赋值语句,涉及 a、b、c 三个变量:
var a = 1
var b = a + 1
a = 2
print(a, b)
试问,a 的值发生改变之后,b 的值会跟着变吗?根据我们的经验,一般 b 是不会变的。其实我们可以设计一门编程语言,让 b 跟着 a 变,这时这个编程语言就是天然支持响应式编程的。
从这个例子我们可以看出,响应式编程的精髓是「响应」,即一方随时「响应」另一方,二者之间可能是依赖关系。
这种响应关系就像水流(flow 或 stream)的上游和下游一样,上游发生变化,下游跟着变化。
操作符
异步流的操作符可以按不同的标准进行分类,比如按出现位置可以分为开头型、中间型和结尾型,比如构建异步流的操作符一般出现在流的开头,而启动型的操作符出现在流的结尾,其他的如转换型的操作符出现在流的中间位置。
构建型
Kotlin 提供了 flow()
、flowOf()
、asFlow()
构造异步流。后面两个方法本质上还是借助 flow()
完成的。Koltin 异步流中最基本的方法是 flow()、collect() 和 emit()。其中 emit()
方法类似 RxJava 种的 onNext(),每个操作符处理完上游传递给自己的数据然后发射/emit出去,就完成了向下游的传递。
- flow(),接收一个 FlowCollector emit() 类型的 lambda 表达式,返回一个 Flow collect() 的 lambda 表达式;
- collect(),相当于下游水龙头,调用 collect() 时触发上游开始执行;
- emit(),上游产生数据传递给下游;
使用这几个方法,我们就可构造简单的异步流了,举个例子:
fun main() = runBlocking
launch
for (i in 1..4)
println("i am not blocked $i")
delay(100)
simpleFlow().collect value ->
println(value)
fun simpleFlow(): Flow<Int> = flow
for (i in 1..4)
delay(1000)
emit(i)
上述代码输出结果为:
i am not blocked 1
i am not blocked 2
i am not blocked 3
i am not blocked 4
1
2
3
4
需要注意以下几点:
- simpleFlow() 方法没有 suspend 修饰,因为该方法本身并非一个耗时方法,而是直接返回的同步方法;
- 通过 emit() 方法发射数据到下游,作用跟普通方法中的 return 和 Sequence 中的 yield() 类似;
- 只有在调用 collect() 方法时数据流才会启动运行,而且每次调用都会从头执行一遍,否则其前面的其他操作符方法不会执行;
- collect() 方法是 suspend 的,必须在另一个 suspend 方法中被调用;
- 数据流可以看做是一组值,每个值的执行也就是流经操作符的顺序是串行的;
关于最后一点,我们看个例子:
fun main() = runBlocking
(1..5).asFlow().filter
println("filtering $it")
it % 2 == 0
.map
println("mapping $it")
"string $it"
.collect
println("collected $it")
结果输出为:
filtering 1
filtering 2
mapping 2
collected string 2
filtering 3
filtering 4
mapping 4
collected string 4
filtering 5
从结果可以看出,每个值都依次经过了 filter()、map() 和 collect()。奇数经过 filter() 时返回 false,就会被被拦下从而没有触发 emit() 方法往下一个操作符发射数据,filter() 和 map() 方法本质上是 inline 方法,最后都会被转换成 transform() 方法,当 filter 条件命中返回 true 时才会调用 emit() 方法,map() 则是无脑调用 emit()。
emit() 方法可以被调用多次:
fun main() = runBlocking
(1..4).asFlow()
.transform request ->
emit("making req $request")
emit(performReq(request))
.collect response -> println(response)
suspend fun performReq(req: Int): String
delay(1000)
return "response $req"
上述代码输出:
making req 1
response 1
making req 2
response 2
making req 3
response 3
making req 4
response 4
上下文型
flowOn()
异步流的上下文是指,异步流执行在哪个协程。我们可以粗略的将异步流分为上游和下游两部分,那么上下文要关注的是上游和下游分别运行在哪个协程中。
最常见的场景是,上游是数据生产者,是耗时操作,比如网络请求等,而下游是消费者,比如在主线程中更新UI等。
下游代码,即 collect() 对应的代码,如果没有特殊指定,默认运行在调用该方法的协程中。
我们可以使用 flowOn() 来改变上游代码运行所在的协程,一般是在构造协程的时候。
fun /*context_*/main() = runBlocking
simple().flowOn(Dispatchers.Default).collect value -> log("Collected $value")
fun simple() = flow
for (i in 1..3)
delay(100)
log("Emitting $i")
emit(i)
fun log(msg: String) = println("[$Thread.currentThread().name] $msg")
协作型
Flow的上游好比是泵,下游好比是水龙头。上游抽水的速度和水龙头放水的速度可能不一致,如果想让二者协同合作以达到最高效率或实现某种效果,就需要使用协作型操作符了。
buffer
一般情况下,只有下游完成处理后才会告诉上游继续发射下一个数据。但是如果上游的发射速度高于下游,是否可以不让上游等待呢?这时我们可以使用 buffer(),让上游实现并发。
可以简单的认为,有了buffer之后上游就放飞自我了,发射时机不再受下游限制了,反正提前发射的值可以被缓存(buffer)。
比如上游发射一个数据需要100ms,下游处理一个数据需要200ms:
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.buffer
import kotlinx.coroutines.flow.cancellable
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.runBlocking
import kotlin.time.ExperimentalTime
import kotlin.time.measureTime
@OptIn(ExperimentalTime::class)
fun main(): Unit = runBlocking
val elapse = measureTime
simpleFlow()/*.buffer()*/.collect
println("consuming $it")
delay(200)
println("consuming done")
println("all done")
println("elapse: $elapse")
fun simpleFlow() = flow
for (i in 1..3)
println("producing $i")
delay(100)
emit(i)
println("producing done")
使用buffer前后的耗时情况如下图:
所以未使用buffer时耗时900ms左右,使用buffer之后耗时在700ms左右。
conflate
在上游发射速度快于下游处理速度时,conflate() 可以让我们丢弃中间值,而仅仅处理最新的值。被丢弃的前提是下游还没来得及处理,如果已经开始处理了,则不会被丢弃。当下游的处理耗时是300ms时中间值会被丢弃,而150ms时则会被正常处理:
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
import kotlin.time.ExperimentalTime
import kotlin.time.measureTime
@OptIn(ExperimentalTime::class)
fun main(): Unit = runBlocking
var delay = 150L // 300L
val elapse = measureTime
simpleFlow()/*.buffer()*/.conflate().collect
println("consuming $it")
delay(delay)
println("consuming $it done")
println("all done")
println("elapse: $elapse")
fun simpleFlow() = flow
for (i in 1..3)
println("producing $i")
delay(100)
emit(i)
println("producing $i done")
150 的输出:
producing 1
producing 1 done
producing 2
consuming 1
producing 2 done
producing 3
consuming 1 done
consuming 2
producing 3 done
consuming 2 done
consuming 3
consuming 3 done
all done
elapse: 616.934055ms
Process finished with exit code 0
300的输出:
producing 1
producing 1 done
producing 2
consuming 1
producing 2 done
producing 3
producing 3 done
consuming 1 done
consuming 3
consuming 3 done
all done
elapse: 767.395294ms
如果上游和下游速度都慢,我们可以使用 conflate 来提升处理速度,本质上是丢弃中间值。另一种方式是每当上游发射一个值时,下游都取消当前正在处理的任务,重新启动一个新任务处理最新值,这类操作符统称为 xxLatest。
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
import kotlin.time.ExperimentalTime
import kotlin.time.measureTime
@OptIn(ExperimentalTime::class)
fun main(): Unit = runBlocking
var delay = 150L
val elapse = measureTime
simpleFlow()/*.buffer().conflate()*/.collectLatest
println("consuming $it")
delay(delay)
println("consuming $it done")
println("all done")
println("elapse: $elapse")
fun simpleFlow() = flow
for (i in 1..3)
println("producing $i")
delay(100)
emit(i)
println("producing $i done")
从输出可以看出,虽然下游处理任务启动了,但是还没有完成的时候就被取消了(没有 consuming 2 done):
producing 1
consuming 1
producing 1 done
producing 2
consuming 2
producing 2 done
producing 3
consuming 3
producing 3 done
consuming 3 done
all done
elapse: 509.858319ms
转换型
- transform
- map
- filter
- take
- reduce
我们可以对流做一些转换操作,把发射出来的值转换成其他类型或值。比如最基本的 map()、filter() 操作符,这些操作符支持 suspend 方法,里面可以是耗时操作:
fun /*transformation_*/main() = runBlocking
(1..3).asFlow().map request -> performRequest(request)
.collect response -> println(response)
suspend fun performRequest(request: Int): String
delay(1000)
return "response $request"
输出为:
(delay 1 sec)
response 1
(delay 1 sec)
response 2
(delay 1 sec)
response 3
map()、filter()、transform() 都是 Flow 的扩展方法,map()和filter()本质上是调用 transform() 实现的,transform() 自己又构造了一个 flow:
public inline fun <T, R> Flow<T>.transform(
@BuilderInference crossinline transform: suspend FlowCollector<R>.(value: T) -> Unit
): Flow<R> = flow // Note: safe flow is used here, because collector is exposed to transform on each operation
collect value ->
// kludge, without it Unit will be returned and TCE won't kick in, KT-28938
return@collect transform(value)
数量型
我们可以使用 take() 操作符只取前几个发射的值,后续的值将会被cancel掉,同时抛出异常:
fun /*sizeLimiting_*/main() = runBlocking
numbers()
.take(2)
.collect value -> println(value)
fun numbers() = flow
try
emit(1)
emit(2)
println("this line will not be executed")
emit(3)
catch (e: Throwable)
println(e.message)
finally
println("finally in numbers")
输出:
1
2
Flow was aborted, no more elements needed
finally in numbers
启动型
collect() 也是一个操作符。除了 collect(),还有哪些操作符能启动一个异步流吗?有的,大概有如下几类:
- 转成集合,比如 toList、toSet;
- 获取第一个发射值以及保证只有一个发射值(无发射值或多于一个则会抛异常)的操作符,如 first()和single();
在这里插入代码片
- 将所有发射值缩减至一个值的操作符,如 reduce()、fold();
- 新启协程运行异步流,比如 launchIn();
这些操作符有如下特点:
- 只能出现在异步流的最后,不能与 collect() 同时出现;
- 都是 suspend 方法;
fun /*trigger_*/main() = runBlocking
val res = (1..5).asFlow()
.map it * it
// .single()
// .first()
.reduce a, b -> a + b
// .fold(1) acc: Int, value: Int -> acc + value
println(res)
输出:55
launchIn() 操作符比较特殊,使用它可以另外启动一个协程用于运行当前异步流。一般 collect() 方法启动的异步流会直接使用当前协程运行,但是就导致 collect() 方法后面的代码需要等待,等待异步流执行完成后才能开始执行。如果我们希望 collect() 方法立即执行,那么可以使用 launchIn()。比如使用 collect() 方法启动异步流时:
fun /*launch_*/main() = runBlocking
(1..3).asFlow().onEach delay(100) .onEach value -> println(value) /*.launchIn(this)*/.collect()
println("DONE")
输出为:
1
2
3
DONE
显然异步流后面的代码是等待异步流执行完成之后才执行的。如果我们使用 launchIn():
fun /*launch_*/main() = runBlocking
(1..3).asFlow().onEach delay(100) .onEach value -> println(value) .launchIn(this)/*.collect()*/
println("DONE")
输出为:
DONE
1
2
3
launchIn() 会另外开启一个新的协程运行异步流的代码,与 printlin("DONE")
是同时运行。其效果相当于:
fun /*launch_*/main() = runBlocking
launch (1..3).asFlow().onEach delay(100) .onEach value -> println(value) .collect()
println("DONE")
组合型
- zip
- combine
- flatten
zip() 可以把两个flow组合起来:
import kotlinx.coroutines.flow.asFlow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flowOf
import kotlinx.coroutines.flow.zip
import kotlinx.coroutines.runBlocking
fun /*compose_*/main() = runBlocking
val keys = flowOf("one", "two", "three", "four").onEach delay(300)
val values = (1..3).asFlow().onEach delay(400)
keys.zip(values) a, b -> "$a -> $b" .collect println(it)
输出:
one -> 1
two -> 2
three -> 3
执行流程示意图:
combine()
combine 的作用是,每当其中一个流有最新值时,该最新值都会与其他流的最新值重新执行一遍处理流程。
fun /*compose_*/main() = runBlocking
val keys = flowOf("one", "two", "three", "four").onEach delay(1000)
val values = (1..3).asFlow().onEach delay(2000)
keys/*.zip*/.combine(values) a, b -> "$a -> $b" .collect value -> println("$value")
每隔2000ms输出一行:
one -> 1
two -> 1
three -> 1
three -> 2
four -> 2
four -> 3
执行流程为:
flatten
在实际开发中,有一种场景是一个flow发射的每一个值会使用map()操作符产生一个新的flow,于是就有了一个流组成的流(a flow of flows):
fun /*flatten_*/main() = runBlocking
(1..3).asFlow()以上是关于对比Java学Kotlin协程-异步流的主要内容,如果未能解决你的问题,请参考以下文章