onEach 更改 StateFlow 中的调度程序(kotlin 协程)
Posted
技术标签:
【中文标题】onEach 更改 StateFlow 中的调度程序(kotlin 协程)【英文标题】:onEach changes the dispatcher in StateFlow (kotlin coroutines) 【发布时间】:2021-02-25 14:40:49 【问题描述】:想象一下以下独立的测试用例
@Test
fun `stateFlow in GlobalScope`() = runBlockingTest
suspend fun makeHeavyRequest(): String
return "heavy result"
val flow1 = flowOf(Unit)
.map makeHeavyRequest()
.onEach logThread("1: before flowOn")
.flowOn(testDispatcher)
.stateIn(GlobalScope, SharingStarted.Lazily, "init state")
val flow2 = flowOf(Unit)
.map makeHeavyRequest()
.onEach logThread("2: before flowOn")
.flowOn(testDispatcher)
.stateIn(GlobalScope, SharingStarted.Lazily, "init state")
.onEach logThread("2: after stateIn")
val flow3 = flowOf(Unit)
.map makeHeavyRequest()
.onEach logThread("3: before flowOn")
.flowOn(testDispatcher)
.onEach logThread("3: after flowOn")
.stateIn(GlobalScope, SharingStarted.Lazily, "init state")
flow1.test
assertEquals("heavy result", expectItem())
cancelAndIgnoreRemainingEvents()
flow2.test
assertEquals("heavy result", expectItem())
cancelAndIgnoreRemainingEvents()
flow3.test
assertEquals("heavy result", expectItem())
cancelAndIgnoreRemainingEvents()
运行它的效果会是:
Thread (1: before flowOn): Thread[main @coroutine#2,5,main]
Thread (2: before flowOn): Thread[main @coroutine#3,5,main]
Thread (2: after stateIn): Thread[main @coroutine#6,5,main]
Thread (3: before flowOn): Thread[DefaultDispatcher-worker-1 @coroutine#8,5,main]
Thread (3: after flowOn): Thread[DefaultDispatcher-worker-1 @coroutine#4,5,main]
org.opentest4j.AssertionFailedError:
Expected :heavy result
Actual :init state
在flow3
中将onEach
放在flowOn
和stateIn
之间会完全改变调度程序并弄乱结果。这是为什么呢?
【问题讨论】:
你能分享你的Flow.test
实现吗(或者如果它是一个依赖项,是哪一个)。
这是Turbine by square,但是当我使用标准方法收集一些列表中的项目然后取消热流时,结果是一样的。
【参考方案1】:
发生这种情况的原因是stateIn
运算符根据上游流是否为ChannelFlow
进行了一些优化。 .flowOn(...)
返回 ChannelFlow
而 .onEach(...)
不返回。
通常这并不重要。为什么在您的情况下很重要,因为您希望 stateIn
返回的流永远不会发出初始值。但是这个参数是强制性的,你应该期望收到初始值是有原因的。你是否真的这样做主要取决于上游流是否能够在不暂停的情况下发出值。
现在看来,stateIn
运算符的优化之一是,它可能会在不暂停的情况下消耗 ChannelFlow。这就是为什么您在使用时会得到预期的行为
.flowOn(testDispatcher) /*returns ChannelFlow*/
.stateIn(GlobalScope, SharingStarted.Lazily, "init state")
【讨论】:
这是一个很好的解释。让我澄清一件事。我很惊讶我在测试场景中没有得到“初始化状态”。你说“你是否真正做主要取决于上游流是否能够在不暂停的情况下发出一个值。” - 如果我,例如把delay()
放在我的makeHeavyRequest()
中我会得到“初始化状态”和“重结果”?
好的,我刚刚检查过了。它就像你说的那样工作,我只需要testDispatcher.advanceTimeBy
。老实说,这种行为对我来说有点奇怪(来自 rxjava 世界并寻找 BehaviorProcessor 类似物)。
还有一件事 - 我知道 ChannelFlow 有一些优化。但这最终不是一个错误吗?你觉得我应该举报吗?
我不会称其为错误,更多的是可以更好地记录的行为。通常在使用 stateflow 时,无论如何您都希望对最近的值进行操作,那么为什么要保留初始值,这可能只是一个占位符,如果流已经发出了一个值【参考方案2】:
您应该使用MainScope
而不是GlobalScope
。
Global Scope
是一个不受控制的范围。
https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/-main-scope.html
您可以在协程中找到更多好的模式: https://medium.com/androiddevelopers/coroutines-patterns-for-work-that-shouldnt-be-cancelled-e26c40f142ad
【讨论】:
确实,将范围更改为我可以控制的内容并将其调度程序更改为 testDispatcher,确实可以修复测试用例。谢谢!以上是关于onEach 更改 StateFlow 中的调度程序(kotlin 协程)的主要内容,如果未能解决你的问题,请参考以下文章
收集 StateFlow 时 Api 没有响应不会更改 kotlin
从 Fragment 返回时,Flow onEach/collect 被多次调用