从多个状态流中收集

Posted

技术标签:

【中文标题】从多个状态流中收集【英文标题】:Collect from several stateflows 【发布时间】:2021-08-20 07:14:49 【问题描述】:

我的 viewModel 中有 2 个 stateFlow。要在片段中收集它们,我必须启动协程 2 次,如下所示:

    lifecycleScope.launchWhenStarted 
        stocksVM.quotes.collect 
            if (it is Resource.Success) 
                it.data?.let  list ->
                    quoteAdapter.submitData(list)
                
            
        
    

    lifecycleScope.launchWhenStarted 
        stocksVM.stockUpdate.collect 
            log(it.data?.data.toString())
        
    

如果我有更多的 stateFlow,我必须分别启动协程。有没有更好的方法来处理我的 Fragment/Activity 或任何地方的多个 stateFlow?

【问题讨论】:

是什么阻止您在一个范围内进行收集?就像你启动 lifecycleScope.launchWhenStarted 一样,你不能只在里面 stocksVM.quotes.collectstocksVM.stockUpdate.collect 不幸的是,我不能。因为 collect() 是协程内的挂起函数,它会挂起直到我的流程停止,因此我的下一个 collect() 不会被调用,直到前一个流程终止 @che10 【参考方案1】:

您将需要不同的协程,因为collect() 是一个暂停函数,它会暂停直到您的Flow 终止。

为了收集多个流,目前推荐的方法是:

lifecycleScope.launch 
    lifecycle.repeatOnLifecycle(Lifecycle.State.STARTED) 
        launch 
          stocksVM.quotes.collect  ...    
        
    
        launch 
            stocksVM.stockUpdate.collect  ... 
        
    

请注意,launchWhenStarted 的问题在于,虽然您新发出的项目不会被处理,但您的生产者仍将在后台运行。

我肯定会读一读,因为它很好地解释了当前的最佳实践:https://medium.com/androiddevelopers/a-safer-way-to-collect-flows-from-android-uis-23080b1f8bda

【讨论】:

感谢您的帮助!我一定会读到的【参考方案2】:

您可以选择混合多个流。

kotlin 中使用函数mergecombine。当然,这两个函数的用法是不一样的。


添加:

如果Flow没有被处理,打开多个Coroutine来collect():

fun main() 
    collectFlow()


fun emitStringElem(): Flow<String> = flow 
    repeat(5) 
        delay(10)
        emit("elem_$it")
    


fun emitIntElem(): Flow<Int> = flow 
    repeat(10) 
        delay(10)
        emit(it)
    

打开两个协程集合 结果是:

From int Flow: item is: 0
From string Flow: item is: elem_0
From int Flow: item is: 1
From string Flow: item is: elem_1
From int Flow: item is: 2
From string Flow: item is: elem_2
From int Flow: item is: 3
From string Flow: item is: elem_3
From int Flow: item is: 4
From string Flow: item is: elem_4
From int Flow: item is: 5
From int Flow: item is: 6
From int Flow: item is: 7
From int Flow: item is: 8
From int Flow: item is: 9

合并两个流

fun margeFlow() = runBlocking 
    merge(
        emitIntElem().map 
            it.toString()
        , emitStringElem()
    ).collect 
        println(it)
    

结果是:

0
elem_0
1
elem_1
2
elem_2
3
elem_3
4
elem_4
5
6
7
8
9

合并两个流程:

fun combineFlow() = runBlocking 
    combine(emitIntElem(), emitStringElem())  int: Int, str: String ->
        "$int combine $str"
    .collect 
        println(it)
    

结果是:

0 combine elem_0
1 combine elem_0
1 combine elem_1
2 combine elem_2
3 combine elem_3
4 combine elem_4
5 combine elem_4
6 combine elem_4
7 combine elem_4
8 combine elem_4
9 combine elem_4

【讨论】:

感谢您的帮助。我会考虑的【参考方案3】:

就像@RóbertNagy 所说,你不应该使用launchWhenStarted。但是有一种替代语法可以以正确的方式执行此操作,而无需执行嵌套的 launches:

stocksVM.quotes
    .flowOnLifecycle(Lifecycle.State.STARTED)
    .onEach  
        if (it is Resource.Success) 
            it.data?.let  list ->
                quoteAdapter.submitData(list)
            
        
    .launchIn(lifecycleScope)

stocksVM.stockUpdate
    .flowOnLifecycle(Lifecycle.State.STARTED)
    .onEach  
        log(it.data?.data.toString())
    .launchIn(lifecycleScope)

【讨论】:

感谢您的帮助!【参考方案4】:

如果有人想知道如何在viewModelScope.launch 的同一块中发出多个流,这与罗伯特的答案相同。即如下

viewModelScope.launch 
    launch 
        exampleFlow1.emit(data)
    
    launch 
        exampleFlow2.emit(data)
    

【讨论】:

以上是关于从多个状态流中收集的主要内容,如果未能解决你的问题,请参考以下文章

如何从 Python 中的文件/流中懒惰地读取多个 JSON 值?

用于从流中读取多个 protobuf 消息的 python 示例

从 Rust 中的多个音频流中并行获取相同大小的块

Akka Streams:流中的状态

如何在从 NodeJS 中的多个输入流中读取时写入单个文件

如何在一个流中合成多个图像