Kotlin 协程Flow 异步流 ⑤ ( 流的上下文 | 上下文保存 | 查看流发射和收集的协程 | 不能在不同协程中执行流的发射和收集操作 | 修改流发射的协程上下文 | flowOn函数 )

Posted 韩曙亮

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Kotlin 协程Flow 异步流 ⑤ ( 流的上下文 | 上下文保存 | 查看流发射和收集的协程 | 不能在不同协程中执行流的发射和收集操作 | 修改流发射的协程上下文 | flowOn函数 )相关的知识,希望对你有一定的参考价值。

文章目录





一、流的上下文



1、上下文保存


Flow 异步流 收集元素 的操作 , 一般是在 协程上下文 中进行的 , 如 : 在协程中调用 Flow#collect 函数 , 收集元素 ;

收集元素 时 的 协程上下文 , 会 传递给 发射元素 的 流构建器 , 作为 流构建器的 上下文 ;

Flow 异步流 在 收集元素 时 , 才调用 流构建器 中的代码 , 收集元素操作在协程中执行 , 流构建器 也同样在相同的协程中运行 ;

流收集元素 和 发射元素 在相同的协程上下文中 的 属性 , 称为 上下文保存 ;


2、流收集函数原型


Flow#collect 函数原型如下 : Flow#collect 函数 由 suspend 关键字修饰 , 该函数是 suspend 挂起函数 , 因此 该函数必须在 协程中调用 ;

public suspend inline fun <T> Flow<T>.collect(crossinline action: suspend (value: T) -> Unit): Unit =
    collect(object : FlowCollector<T> 
        override suspend fun emit(value: T) = action(value)
    )

3、流发射函数原型


Flow 异步流的 构建器 函数 : 流构建器 不是 suspend 挂起函数 , 可以在普通的线程中运行 , 不必在协程中运行 ;

  • flow 构建器 :
public fun <T> flow(@BuilderInference block: suspend FlowCollector<T>.() -> Unit): Flow<T> = SafeFlow(block)
  • asFlow 构建器 :
@FlowPreview
public fun <T> (() -> T).asFlow(): Flow<T> = flow 
    emit(invoke())

  • flowOf 构建器 :
public fun <T> flowOf(vararg elements: T): Flow<T> = flow 
    for (element in elements) 
        emit(element)
    


4、代码示例 - 查看流发射和收集的协程


代码示例 : 在 流收集 时 和 流构建时 , 分别打印线程名称 , 查看是在哪个线程中执行的 ;

package kim.hsl.coroutine

import android.os.Bundle
import androidx.appcompat.app.AppCompatActivity
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking

class MainActivity : AppCompatActivity() 
    override fun onCreate(savedInstanceState: Bundle?) 
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        // 携程中调用挂起函数返回一个 Flow 异步流
        runBlocking 
            println("流收集时的协程上下文 : $Thread.currentThread().name")
            // 调用 Flow#collect 函数, 可以获取在异步流中产生的元素
            flowFunction().collect 
                // 每隔 500ms 即可拿到一个 Int 元素
                // 并且该操作是异步操作, 不会阻塞调用线程
                println(it)
            
        
    

    /**
     * 使用 flow 构建器 Flow 异步流
     * 在该异步流中, 异步地产生 Int 元素
     */
    suspend fun flowFunction() = flow<Int> 
        println("流构建器的上下文 : $Thread.currentThread().name")
        for (i in 0..2) 
            // 挂起函数 挂起 500ms
            // 在协程中, 该挂起操作不会阻塞调用线程, 会继续执行其它代码指令
            // 500ms 恢复执行, 继续执行挂起函数之后的后续代码指令
            delay(500)
            // 每隔 500ms 产生一个元素
            // 通过调用 FlowCollector#emit 生成一个元素
            emit(i)
        
    

执行结果 : 最终执行时 , 流构建器和流收集 都是在 主线程中执行的 , 这是 由 runBlocking 协程构建器 将 主线程 包装后的 协程 ;

2022-12-23 14:29:06.315 17484-17484/kim.hsl.coroutine I/System.out: 流收集时的协程上下文 : main
2022-12-23 14:29:06.323 17484-17484/kim.hsl.coroutine I/System.out: 流构建器的上下文 : main
2022-12-23 14:29:06.875 17484-17484/kim.hsl.coroutine I/System.out: 0
2022-12-23 14:29:07.399 17484-17484/kim.hsl.coroutine I/System.out: 1
2022-12-23 14:29:07.940 17484-17484/kim.hsl.coroutine I/System.out: 2


5、代码示例 - 不能在不同协程中执行相同流的发射和收集操作


在流构建器中 , 将代码定义在如下协程中执行 , 使用 Dispatchers.IO 调度器 , 也就是协程在子线程中执行 ;

withContext(Dispatchers.IO)

在流收集时 , 在 使用 runBlocking 将主线程包装后的 协程 中 , 收集元素 , 协程在主线程中执行 ;

runBlocking 

代码示例 :

package kim.hsl.coroutine

import android.os.Bundle
import androidx.appcompat.app.AppCompatActivity
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.*
import kotlinx.coroutines.runBlocking
import kotlinx.coroutines.withContext

class MainActivity : AppCompatActivity() 
    override fun onCreate(savedInstanceState: Bundle?) 
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        // 携程中调用挂起函数返回一个 Flow 异步流
        runBlocking 
            println("流收集时的协程上下文 : $Thread.currentThread().name")
            // 调用 Flow#collect 函数, 可以获取在异步流中产生的元素
            flowFunction().collect 
                // 每隔 500ms 即可拿到一个 Int 元素
                // 并且该操作是异步操作, 不会阻塞调用线程
                println(it)
            
        
    

    /**
     * 使用 flow 构建器 Flow 异步流
     * 在该异步流中, 异步地产生 Int 元素
     */
    suspend fun flowFunction() = flow<Int> 
        // 在后台线程中发射元素
        withContext(Dispatchers.IO)
            println("流构建器的上下文 : $Thread.currentThread().name")
            for (i in 0..2) 
                // 挂起函数 挂起 500ms
                // 在协程中, 该挂起操作不会阻塞调用线程, 会继续执行其它代码指令
                // 500ms 恢复执行, 继续执行挂起函数之后的后续代码指令
                delay(500)
                // 每隔 500ms 产生一个元素
                // 通过调用 FlowCollector#emit 生成一个元素
                emit(i)
            
        
    

执行程序后的报错信息如下 :

2022-12-23 14:39:05.805 19710-19710/kim.hsl.coroutine I/System.out: 流收集时的协程上下文 : main
2022-12-23 14:39:05.850 19710-19738/kim.hsl.coroutine I/System.out: 流构建器的上下文 : DefaultDispatcher-worker-1
2022-12-23 14:39:06.436 19710-19710/kim.hsl.coroutine D/AndroidRuntime: Shutting down VM
2022-12-23 14:39:06.462 19710-19710/kim.hsl.coroutine E/AndroidRuntime: FATAL EXCEPTION: main
    Process: kim.hsl.coroutine, PID: 19710
    java.lang.RuntimeException: Unable to start activity ComponentInfokim.hsl.coroutine/kim.hsl.coroutine.MainActivity: java.lang.IllegalStateException: Flow invariant is violated:
    		Flow was collected in [BlockingCoroutineActive@daf39f2, BlockingEventLoop@a6f9843],
    		but emission happened in [DispatchedCoroutineActive@8ba6ec0, Dispatchers.IO].
    		Please refer to 'flow' documentation or use 'flowOn' instead
        at android.app.ActivityThread.performLaunchActivity(ActivityThread.java:2951)
        at android.app.ActivityThread.handleLaunchActivity(ActivityThread.java:3086)
        at android.app.servertransaction.LaunchActivityItem.execute(LaunchActivityItem.java:78)
        at android.app.servertransaction.TransactionExecutor.executeCallbacks(TransactionExecutor.java:108)
        at android.app.servertransaction.TransactionExecutor.execute(TransactionExecutor.java:68)
        at android.app.ActivityThread$H.handleMessage(ActivityThread.java:1816)
        at android.os.Handler.dispatchMessage(Handler.java:106)
        at android.os.Looper.loop(Looper.java:193)
        at android.app.ActivityThread.main(ActivityThread.java:6718)
        at java.lang.reflect.Method.invoke(Native Method)
        at com.android.internal.os.RuntimeInit$MethodAndArgsCaller.run(RuntimeInit.java:493)
        at com.android.internal.os.ZygoteInit.main(ZygoteInit.java:858)
     Caused by: java.lang.IllegalStateException: Flow invariant is violated:
    		Flow was collected in [BlockingCoroutineActive@daf39f2, BlockingEventLoop@a6f9843],
    		but emission happened in [DispatchedCoroutineActive@8ba6ec0, Dispatchers.IO].
    		Please refer to 'flow' documentation or use 'flowOn' instead
        at kotlinx.coroutines.flow.internal.SafeCollector_commonKt.checkContext(SafeCollector.common.kt:84)
        at kotlinx.coroutines.flow.internal.SafeCollector.checkContext(SafeCollector.kt:88)
        at kotlinx.coroutines.flow.internal.SafeCollector.emit(SafeCollector.kt:74)
        at kotlinx.coroutines.flow.internal.SafeCollector.emit(SafeCollector.kt:59)
        at kim.hsl.coroutine.MainActivity$flowFunction$2$1.invokeSuspend(MainActivity.kt:43)
        at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
        at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
        at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:571)
        at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:750)
        at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:678)
        at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:665)
2022-12-23 14:39:06.505 19710-19710/kim.hsl.coroutine I/Process: Sending signal. PID: 19710 SIG: 9





二、修改流发射的协程上下文



在上述 流的收集流的发射必须在同一个协程中执行 , 这样并不是我们想要的 ;

如 : 下载时 , 想要在后台线程中下载 , 在主线程中更新 UI , 那么对应 Flow 异步流应该是在 后台线程中 发射元素 , 在主线程中 收集元素 ;


使用 flowOn 操作符 , 可以修改 流发射 的协程上下文 , 不必必须在 流收集 的协程上下文中执行 流发射操作 ;



1、Flow#flowOn 函数原型


Flow#flowOn 函数原型如下 :

/**
 * 将此流执行的上下文更改为给定的[context]。
 * 此操作符是可组合的,仅影响前面没有自己上下文的操作符。
 * 这个操作符是上下文保护的:[context] **不会**泄漏到下游流中。
 *
 * 例如:
 *
 * ```
 * withContext(Dispatchers.Main) 
 *     val singleValue = intFlow // will be executed on IO if context wasn't specified before
 *         .map  ...  // Will be executed in IO
 *         .flowOn(Dispatchers.IO)
 *         .filter  ...  // Will be executed in Default
 *         .flowOn(Dispatchers.Default)
 *         .single() // Will be executed in the Main
 * 
 * ```
 *
 * 有关上下文保存的更多说明,请参考[Flow]文档。
 *
 * 如果更改上下文不需要更改,则此操作符保留流的_sequential_性质
 * (调度)[CoroutineDispatcher]。否则,如果需要更改dispatcher,它将进行收集
 * 使用指定[上下文]运行的协同例程中的流发射,并从另一个协同例程中发射它们
 * 使用带有[default][channel]的通道与原始收集器的上下文连接。BUFFERED]缓冲区大小
 * 在两个协程之间,类似于[buffer]操作符,除非显式调用[buffer]操作符
 * 在' flowOn '之前或之后,请求缓冲行为并指定通道大小。
 *
 * 注意,跨不同调度程序操作的流在取消时可能会丢失一些正在运行的元素。
 * 特别是,该操作符确保下游流不会在取消时恢复,即使元素
 * 已经被上游的气流释放出来了。
 *
 * ###算子融合
 *
 * 相邻的[channelFlow]、[flowOn]、[buffer]和[produceIn]的应用是
 * 始终融合,以便只有一个正确配置的通道用于执行。
 *
 * 多个“flowOn”操作符融合到一个具有组合上下文的单一“flowOn”。上下文的要素
 * 第一个' flowOn '操作符自然优先于第二个' flowOn '操作符的元素
 * 当它们具有相同的上下文键时,例如:
 *
 * ```
 * flow.map  ...  // Will be executed in IO
 *     .flowOn(Dispatchers.IO) // This one takes precedence
 *     .flowOn(Dispatchers.Default)
 * ```
 *
 * 请注意,[SharedFlow]的实例本身没有执行上下文,
 * 所以应用' flowOn '到' SharedFlow '没有效果。参见[SharedFlow]关于Operator Fusion的文档。
 *
 * @throws [IllegalArgumentException] 如果所提供的上下文包含[Job]实例。
 */
public fun <T> Flow<T>.flowOn(context: CoroutineContext): Flow<T> 
    checkFlowContext(context)
    return when 
        context == EmptyCoroutineContext -> this
        this is FusibleFlow -> fuse(context = context)
        else -> ChannelFlowOperatorImpl(this, context = context)
    


2、代码示例


代码示例 :

package kim.hsl.coroutine

import android.os.Bundle
import androidx.appcompat.app.AppCompatActivity
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.runBlocking

class MainActivity : AppCompatActivity() 
    override fun onCreate(savedInstanceState: Bundle?) 
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        // 携程中调用挂起函数返回一个 Flow 异步流
        runBlocking 
            println("流收集时的协程上下文 : $Thread.currentThread().name")
            // 调用 Flow#collect 函数, 可以获取在异步流中产生的元素
            flowFunction().collect 
                // 每隔 500ms 即可拿到一个 Int 元素
                // 并且该操作是异步操作, 不会阻塞调用线程
                println(it)
            
        
    

    /**
     * 使用 flow 构建器 Flow 异步流
     * 在该异步流中, 异步地产生 Int 元素
     */
    suspend fun flowFunction() = flow<Int> 

        println("流构建器的上下文 : $Thread.currentThread().name")
        for (i in 0..2) 
            // 挂起函数 挂起 500ms
            // 在协程中, 该挂起操作不会阻塞调用线程, 会继续执行其它代码指令
            // 500ms 恢复执行, 继续执行挂起函数之后的后续代码指令
            delay(500)
            // 每隔 500ms 产生一个元素
            // 通过调用 FlowCollector#emit 生成一个元素
            emit(i)
        

    .flowOn(Dispatchers.IO)

执行结果 : 没有报错 , 并且 流发射 在子线程中执行 , 流收集 在 主线程中执行 ;

2022-12-23 14:50:32.925 21339-21339/kim.hsl.coroutine I/System.out: 流收集时的协程上下文 : main
2022-12-23 14:50:32.991 21339-21374/kim.hsl.coroutine I/System.out: 流构建器的上下文 : DefaultDispatcher-worker-2
2022-12-23 14:50:33.512 21339-21339/kim.hsl.coroutine I/System.out: 0
2022-12-23 14:50:34.038 21339-21339/kim.hsl.coroutine I/System.out: 1
2022-12-23 14:50:34.583 21339-21339/kim.hsl.coroutine I/System.out: 2

以上是关于Kotlin 协程Flow 异步流 ⑤ ( 流的上下文 | 上下文保存 | 查看流发射和收集的协程 | 不能在不同协程中执行流的发射和收集操作 | 修改流发射的协程上下文 | flowOn函数 )的主要内容,如果未能解决你的问题,请参考以下文章

Kotlin 协程Flow 异步流 ④ ( 流的构建器函数 | flow 构建器函数 | flowOf 构建器函数 | asFlow 构建器函数 )

Kotlin 协程Flow 异步流 ④ ( 流的构建器函数 | flow 构建器函数 | flowOf 构建器函数 | asFlow 构建器函数 )

Kotlin 协程Flow 异步流 ③ ( 冷流 | 流被收集时运行 | 流的连续性 )

Kotlin 协程Flow 异步流 ③ ( 冷流 | 流被收集时运行 | 流的连续性 )

Kotlin 协程Flow 异步流 ⑦ ( 调用 FlowCollector#emit 发射元素时自动执行 Flow 流的取消检测 | 启用检测 Flow 流的取消cancellable函数 )

Kotlin 协程Flow 异步流 ⑦ ( 调用 FlowCollector#emit 发射元素时自动执行 Flow 流的取消检测 | 启用检测 Flow 流的取消cancellable函数 )