onEach 更改 StateFlow 中的调度程序(kotlin 协程)

Posted

技术标签:

【中文标题】onEach 更改 StateFlow 中的调度程序(kotlin 协程)【英文标题】:onEach changes the dispatcher in StateFlow (kotlin coroutines) 【发布时间】:2021-02-25 14:40:49 【问题描述】:

想象一下以下独立的测试用例

@Test
fun `stateFlow in GlobalScope`() = runBlockingTest 

    suspend fun makeHeavyRequest(): String 
        return "heavy result"
    

    val flow1 = flowOf(Unit)
        .map  makeHeavyRequest() 
        .onEach  logThread("1: before flowOn") 
        .flowOn(testDispatcher)
        .stateIn(GlobalScope, SharingStarted.Lazily, "init state")

    val flow2 = flowOf(Unit)
        .map  makeHeavyRequest() 
        .onEach  logThread("2: before flowOn") 
        .flowOn(testDispatcher)
        .stateIn(GlobalScope, SharingStarted.Lazily, "init state")
        .onEach  logThread("2: after stateIn") 

    val flow3 = flowOf(Unit)
        .map  makeHeavyRequest() 
        .onEach  logThread("3: before flowOn") 
        .flowOn(testDispatcher)
        .onEach  logThread("3: after flowOn") 
        .stateIn(GlobalScope, SharingStarted.Lazily, "init state")

    flow1.test 
        assertEquals("heavy result", expectItem())
        cancelAndIgnoreRemainingEvents()
    

    flow2.test 
        assertEquals("heavy result", expectItem())
        cancelAndIgnoreRemainingEvents()
    

    flow3.test 
        assertEquals("heavy result", expectItem())
        cancelAndIgnoreRemainingEvents()
    


运行它的效果会是:

Thread (1: before flowOn): Thread[main @coroutine#2,5,main]
Thread (2: before flowOn): Thread[main @coroutine#3,5,main]
Thread (2: after stateIn): Thread[main @coroutine#6,5,main]
Thread (3: before flowOn): Thread[DefaultDispatcher-worker-1 @coroutine#8,5,main]
Thread (3: after flowOn): Thread[DefaultDispatcher-worker-1 @coroutine#4,5,main]


org.opentest4j.AssertionFailedError: 
Expected :heavy result
Actual   :init state

flow3 中将onEach 放在flowOnstateIn 之间会完全改变调度程序并弄乱结果。这是为什么呢?

【问题讨论】:

你能分享你的Flow.test 实现吗(或者如果它是一个依赖项,是哪一个)。 这是Turbine by square,但是当我使用标准方法收集一些列表中的项目然后取消热流时,结果是一样的。 【参考方案1】:

发生这种情况的原因是stateIn 运算符根据上游流是否为ChannelFlow 进行了一些优化。 .flowOn(...) 返回 ChannelFlow.onEach(...) 不返回。

通常这并不重要。为什么在您的情况下很重要,因为您希望 stateIn 返回的流永远不会发出初始值。但是这个参数是强制性的,你应该期望收到初始值是有原因的。你是否真的这样做主要取决于上游流是否能够在不暂停的情况下发出值。

现在看来,stateIn 运算符的优化之一是,它可能会在不暂停的情况下消耗 ChannelFlow。这就是为什么您在使用时会得到预期的行为

.flowOn(testDispatcher) /*returns ChannelFlow*/
.stateIn(GlobalScope, SharingStarted.Lazily, "init state")

【讨论】:

这是一个很好的解释。让我澄清一件事。我很惊讶我在测试场景中没有得到“初始化状态”。你说“你是否真正做主要取决于上游流是否能够在不暂停的情况下发出一个值。” - 如果我,例如把delay() 放在我的makeHeavyRequest() 中我会得到“初始化状态”和“重结果”? 好的,我刚刚检查过了。它就像你说的那样工作,我只需要testDispatcher.advanceTimeBy。老实说,这种行为对我来说有点奇怪(来自 rxjava 世界并寻找 BehaviorProcessor 类似物)。 还有一件事 - 我知道 ChannelFlow 有一些优化。但这最终不是一个错误吗?你觉得我应该举报吗? 我不会称其为错误,更多的是可以更好地记录的行为。通常在使用 stateflow 时,无论如何您都希望对最近的值进行操作,那么为什么要保留初始值,这可能只是一个占位符,如果流已经发出了一个值【参考方案2】:

您应该使用MainScope 而不是GlobalScopeGlobal Scope 是一个不受控制的范围。

https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-main-scope.html

您可以在协程中找到更多好的模式: https://medium.com/androiddevelopers/coroutines-patterns-for-work-that-shouldnt-be-cancelled-e26c40f142ad

【讨论】:

确实,将范围更改为我可以控制的内容并将其调度程序更改为 testDispatcher,确实可以修复测试用例。谢谢!

以上是关于onEach 更改 StateFlow 中的调度程序(kotlin 协程)的主要内容,如果未能解决你的问题,请参考以下文章

我一次可以观察多少个 Stateflow?

在 Flow 中的 onEach 中过滤集合不起作用

收集 StateFlow 时 Api 没有响应不会更改 kotlin

从 Fragment 返回时,Flow onEach/collect 被多次调用

不做跟风党,LiveData,StateFlow,SharedFlow 使用场景对比

不做跟风党,LiveData,StateFlow,SharedFlow 使用场景对比