Kotlin 协程Flow 异步流 ⑤ ( 流的上下文 | 上下文保存 | 查看流发射和收集的协程 | 不能在不同协程中执行流的发射和收集操作 | 修改流发射的协程上下文 | flowOn函数 )
Posted 韩曙亮
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kotlin 协程Flow 异步流 ⑤ ( 流的上下文 | 上下文保存 | 查看流发射和收集的协程 | 不能在不同协程中执行流的发射和收集操作 | 修改流发射的协程上下文 | flowOn函数 )相关的知识,希望对你有一定的参考价值。
文章目录
一、流的上下文
1、上下文保存
Flow 异步流 收集元素 的操作 , 一般是在 协程上下文 中进行的 , 如 : 在协程中调用 Flow#collect 函数 , 收集元素 ;
收集元素 时 的 协程上下文 , 会 传递给 发射元素 的 流构建器 , 作为 流构建器的 上下文 ;
Flow 异步流 在 收集元素 时 , 才调用 流构建器 中的代码 , 收集元素操作在协程中执行 , 流构建器 也同样在相同的协程中运行 ;
流收集元素 和 发射元素 在相同的协程上下文中 的 属性 , 称为 上下文保存 ;
2、流收集函数原型
Flow#collect 函数原型如下 : Flow#collect 函数 由 suspend 关键字修饰 , 该函数是 suspend 挂起函数 , 因此 该函数必须在 协程中调用 ;
public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =
collect(object : FlowCollector<T>
override suspend fun emit(value: T) = action(value)
)
3、流发射函数原型
Flow 异步流的 构建器 函数 : 流构建器 不是 suspend 挂起函数 , 可以在普通的线程中运行 , 不必在协程中运行 ;
- flow 构建器 :
public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)
- asFlow 构建器 :
@FlowPreview
public fun <T> (() -> T).asFlow(): Flow<T> = flow
emit(invoke())
- flowOf 构建器 :
public fun <T> flowOf(vararg elements: T): Flow<T> = flow
for (element in elements)
emit(element)
4、代码示例 - 查看流发射和收集的协程
代码示例 : 在 流收集 时 和 流构建时 , 分别打印线程名称 , 查看是在哪个线程中执行的 ;
package kim.hsl.coroutine
import android.os.Bundle
import androidx.appcompat.app.AppCompatActivity
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
class MainActivity : AppCompatActivity()
override fun onCreate(savedInstanceState: Bundle?)
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
// 携程中调用挂起函数返回一个 Flow 异步流
runBlocking
println("流收集时的协程上下文 : $Thread.currentThread().name")
// 调用 Flow#collect 函数, 可以获取在异步流中产生的元素
flowFunction().collect
// 每隔 500ms 即可拿到一个 Int 元素
// 并且该操作是异步操作, 不会阻塞调用线程
println(it)
/**
* 使用 flow 构建器 Flow 异步流
* 在该异步流中, 异步地产生 Int 元素
*/
suspend fun flowFunction() = flow<Int>
println("流构建器的上下文 : $Thread.currentThread().name")
for (i in 0..2)
// 挂起函数 挂起 500ms
// 在协程中, 该挂起操作不会阻塞调用线程, 会继续执行其它代码指令
// 500ms 恢复执行, 继续执行挂起函数之后的后续代码指令
delay(500)
// 每隔 500ms 产生一个元素
// 通过调用 FlowCollector#emit 生成一个元素
emit(i)
执行结果 : 最终执行时 , 流构建器和流收集 都是在 主线程中执行的 , 这是 由 runBlocking 协程构建器 将 主线程 包装后的 协程 ;
2022-12-23 14:29:06.315 17484-17484/kim.hsl.coroutine I/System.out: 流收集时的协程上下文 : main
2022-12-23 14:29:06.323 17484-17484/kim.hsl.coroutine I/System.out: 流构建器的上下文 : main
2022-12-23 14:29:06.875 17484-17484/kim.hsl.coroutine I/System.out: 0
2022-12-23 14:29:07.399 17484-17484/kim.hsl.coroutine I/System.out: 1
2022-12-23 14:29:07.940 17484-17484/kim.hsl.coroutine I/System.out: 2
5、代码示例 - 不能在不同协程中执行相同流的发射和收集操作
在流构建器中 , 将代码定义在如下协程中执行 , 使用 Dispatchers.IO 调度器 , 也就是协程在子线程中执行 ;
withContext(Dispatchers.IO)
在流收集时 , 在 使用 runBlocking 将主线程包装后的 协程 中 , 收集元素 , 协程在主线程中执行 ;
runBlocking
代码示例 :
package kim.hsl.coroutine
import android.os.Bundle
import androidx.appcompat.app.AppCompatActivity
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContext
class MainActivity : AppCompatActivity()
override fun onCreate(savedInstanceState: Bundle?)
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
// 携程中调用挂起函数返回一个 Flow 异步流
runBlocking
println("流收集时的协程上下文 : $Thread.currentThread().name")
// 调用 Flow#collect 函数, 可以获取在异步流中产生的元素
flowFunction().collect
// 每隔 500ms 即可拿到一个 Int 元素
// 并且该操作是异步操作, 不会阻塞调用线程
println(it)
/**
* 使用 flow 构建器 Flow 异步流
* 在该异步流中, 异步地产生 Int 元素
*/
suspend fun flowFunction() = flow<Int>
// 在后台线程中发射元素
withContext(Dispatchers.IO)
println("流构建器的上下文 : $Thread.currentThread().name")
for (i in 0..2)
// 挂起函数 挂起 500ms
// 在协程中, 该挂起操作不会阻塞调用线程, 会继续执行其它代码指令
// 500ms 恢复执行, 继续执行挂起函数之后的后续代码指令
delay(500)
// 每隔 500ms 产生一个元素
// 通过调用 FlowCollector#emit 生成一个元素
emit(i)
执行程序后的报错信息如下 :
2022-12-23 14:39:05.805 19710-19710/kim.hsl.coroutine I/System.out: 流收集时的协程上下文 : main
2022-12-23 14:39:05.850 19710-19738/kim.hsl.coroutine I/System.out: 流构建器的上下文 : DefaultDispatcher-worker-1
2022-12-23 14:39:06.436 19710-19710/kim.hsl.coroutine D/AndroidRuntime: Shutting down VM
2022-12-23 14:39:06.462 19710-19710/kim.hsl.coroutine E/AndroidRuntime: FATAL EXCEPTION: main
Process: kim.hsl.coroutine, PID: 19710
java.lang.RuntimeException: Unable to start activity ComponentInfokim.hsl.coroutine/kim.hsl.coroutine.MainActivity: java.lang.IllegalStateException: Flow invariant is violated:
Flow was collected in [BlockingCoroutineActive@daf39f2, BlockingEventLoop@a6f9843],
but emission happened in [DispatchedCoroutineActive@8ba6ec0, Dispatchers.IO].
Please refer to 'flow' documentation or use 'flowOn' instead
at android.app.ActivityThread.performLaunchActivity(ActivityThread.java:2951)
at android.app.ActivityThread.handleLaunchActivity(ActivityThread.java:3086)
at android.app.servertransaction.LaunchActivityItem.execute(LaunchActivityItem.java:78)
at android.app.servertransaction.TransactionExecutor.executeCallbacks(TransactionExecutor.java:108)
at android.app.servertransaction.TransactionExecutor.execute(TransactionExecutor.java:68)
at android.app.ActivityThread$H.handleMessage(ActivityThread.java:1816)
at android.os.Handler.dispatchMessage(Handler.java:106)
at android.os.Looper.loop(Looper.java:193)
at android.app.ActivityThread.main(ActivityThread.java:6718)
at java.lang.reflect.Method.invoke(Native Method)
at com.android.internal.os.RuntimeInit$MethodAndArgsCaller.run(RuntimeInit.java:493)
at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:858)
Caused by: java.lang.IllegalStateException: Flow invariant is violated:
Flow was collected in [BlockingCoroutineActive@daf39f2, BlockingEventLoop@a6f9843],
but emission happened in [DispatchedCoroutineActive@8ba6ec0, Dispatchers.IO].
Please refer to 'flow' documentation or use 'flowOn' instead
at kotlinx.coroutines.flow.internal.SafeCollector_commonKt.checkContext(SafeCollector.common.kt:84)
at kotlinx.coroutines.flow.internal.SafeCollector.checkContext(SafeCollector.kt:88)
at kotlinx.coroutines.flow.internal.SafeCollector.emit(SafeCollector.kt:74)
at kotlinx.coroutines.flow.internal.SafeCollector.emit(SafeCollector.kt:59)
at kim.hsl.coroutine.MainActivity$flowFunction$2$1.invokeSuspend(MainActivity.kt:43)
at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:571)
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:750)
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:678)
at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:665)
2022-12-23 14:39:06.505 19710-19710/kim.hsl.coroutine I/Process: Sending signal. PID: 19710 SIG: 9
二、修改流发射的协程上下文
在上述 流的收集 和 流的发射 都 必须在同一个协程中执行 , 这样并不是我们想要的 ;
如 : 下载时 , 想要在后台线程中下载 , 在主线程中更新 UI , 那么对应 Flow 异步流应该是在 后台线程中 发射元素 , 在主线程中 收集元素 ;
使用 flowOn 操作符 , 可以修改 流发射 的协程上下文 , 不必必须在 流收集 的协程上下文中执行 流发射操作 ;
1、Flow#flowOn 函数原型
Flow#flowOn 函数原型如下 :
/**
* 将此流执行的上下文更改为给定的[context]。
* 此操作符是可组合的,仅影响前面没有自己上下文的操作符。
* 这个操作符是上下文保护的:[context] **不会**泄漏到下游流中。
*
* 例如:
*
* ```
* withContext(Dispatchers.Main)
* val singleValue = intFlow // will be executed on IO if context wasn't specified before
* .map ... // Will be executed in IO
* .flowOn(Dispatchers.IO)
* .filter ... // Will be executed in Default
* .flowOn(Dispatchers.Default)
* .single() // Will be executed in the Main
*
* ```
*
* 有关上下文保存的更多说明,请参考[Flow]文档。
*
* 如果更改上下文不需要更改,则此操作符保留流的_sequential_性质
* (调度)[CoroutineDispatcher]。否则,如果需要更改dispatcher,它将进行收集
* 使用指定[上下文]运行的协同例程中的流发射,并从另一个协同例程中发射它们
* 使用带有[default][channel]的通道与原始收集器的上下文连接。BUFFERED]缓冲区大小
* 在两个协程之间,类似于[buffer]操作符,除非显式调用[buffer]操作符
* 在' flowOn '之前或之后,请求缓冲行为并指定通道大小。
*
* 注意,跨不同调度程序操作的流在取消时可能会丢失一些正在运行的元素。
* 特别是,该操作符确保下游流不会在取消时恢复,即使元素
* 已经被上游的气流释放出来了。
*
* ###算子融合
*
* 相邻的[channelFlow]、[flowOn]、[buffer]和[produceIn]的应用是
* 始终融合,以便只有一个正确配置的通道用于执行。
*
* 多个“flowOn”操作符融合到一个具有组合上下文的单一“flowOn”。上下文的要素
* 第一个' flowOn '操作符自然优先于第二个' flowOn '操作符的元素
* 当它们具有相同的上下文键时,例如:
*
* ```
* flow.map ... // Will be executed in IO
* .flowOn(Dispatchers.IO) // This one takes precedence
* .flowOn(Dispatchers.Default)
* ```
*
* 请注意,[SharedFlow]的实例本身没有执行上下文,
* 所以应用' flowOn '到' SharedFlow '没有效果。参见[SharedFlow]关于Operator Fusion的文档。
*
* @throws [IllegalArgumentException] 如果所提供的上下文包含[Job]实例。
*/
public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T>
checkFlowContext(context)
return when
context == EmptyCoroutineContext -> this
this is FusibleFlow -> fuse(context = context)
else -> ChannelFlowOperatorImpl(this, context = context)
2、代码示例
代码示例 :
package kim.hsl.coroutine
import android.os.Bundle
import androidx.appcompat.app.AppCompatActivity
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.runBlocking
class MainActivity : AppCompatActivity()
override fun onCreate(savedInstanceState: Bundle?)
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
// 携程中调用挂起函数返回一个 Flow 异步流
runBlocking
println("流收集时的协程上下文 : $Thread.currentThread().name")
// 调用 Flow#collect 函数, 可以获取在异步流中产生的元素
flowFunction().collect
// 每隔 500ms 即可拿到一个 Int 元素
// 并且该操作是异步操作, 不会阻塞调用线程
println(it)
/**
* 使用 flow 构建器 Flow 异步流
* 在该异步流中, 异步地产生 Int 元素
*/
suspend fun flowFunction() = flow<Int>
println("流构建器的上下文 : $Thread.currentThread().name")
for (i in 0..2)
// 挂起函数 挂起 500ms
// 在协程中, 该挂起操作不会阻塞调用线程, 会继续执行其它代码指令
// 500ms 恢复执行, 继续执行挂起函数之后的后续代码指令
delay(500)
// 每隔 500ms 产生一个元素
// 通过调用 FlowCollector#emit 生成一个元素
emit(i)
.flowOn(Dispatchers.IO)
执行结果 : 没有报错 , 并且 流发射 在子线程中执行 , 流收集 在 主线程中执行 ;
2022-12-23 14:50:32.925 21339-21339/kim.hsl.coroutine I/System.out: 流收集时的协程上下文 : main
2022-12-23 14:50:32.991 21339-21374/kim.hsl.coroutine I/System.out: 流构建器的上下文 : DefaultDispatcher-worker-2
2022-12-23 14:50:33.512 21339-21339/kim.hsl.coroutine I/System.out: 0
2022-12-23 14:50:34.038 21339-21339/kim.hsl.coroutine I/System.out: 1
2022-12-23 14:50:34.583 21339-21339/kim.hsl.coroutine I/System.out: 2
以上是关于Kotlin 协程Flow 异步流 ⑤ ( 流的上下文 | 上下文保存 | 查看流发射和收集的协程 | 不能在不同协程中执行流的发射和收集操作 | 修改流发射的协程上下文 | flowOn函数 )的主要内容,如果未能解决你的问题,请参考以下文章
Kotlin 协程Flow 异步流 ④ ( 流的构建器函数 | flow 构建器函数 | flowOf 构建器函数 | asFlow 构建器函数 )
Kotlin 协程Flow 异步流 ④ ( 流的构建器函数 | flow 构建器函数 | flowOf 构建器函数 | asFlow 构建器函数 )
Kotlin 协程Flow 异步流 ③ ( 冷流 | 流被收集时运行 | 流的连续性 )
Kotlin 协程Flow 异步流 ③ ( 冷流 | 流被收集时运行 | 流的连续性 )
Kotlin 协程Flow 异步流 ⑦ ( 调用 FlowCollector#emit 发射元素时自动执行 Flow 流的取消检测 | 启用检测 Flow 流的取消cancellable函数 )
Kotlin 协程Flow 异步流 ⑦ ( 调用 FlowCollector#emit 发射元素时自动执行 Flow 流的取消检测 | 启用检测 Flow 流的取消cancellable函数 )