按需组合嵌套 Observable 的最新值

Posted

技术标签:

【中文标题】按需组合嵌套 Observable 的最新值【英文标题】:Combine the latest values of a nested Observable on demand 【发布时间】:2019-02-26 08:26:07 【问题描述】:

我有一些自定义字段,每个字段都有一个可以使用BehaviorSubject<Value> 检索的值。显示的字段基于我从 API 获得的内容,所以最后我有 n 个 BehaviorSubject<Value>s。我想将这些值组合到一个Observable<List<Value>> 中,其中列表包含来自这些字段的最新值(顺序无关紧要)。但问题是这些字段并非同时可用,因为它们是在 UI 加载时创建的,所以我不能将 Observable.combineLatest 与主题列表一起使用。

我目前所做的是我创建了以下变量:

private val values = BehaviorSubject.create<Pair<Int, Value>>()

我使用这个主题来订阅该领域的所有主题,但首先将主题与他们的位置进行映射并制作一对。

fieldSubject.map 
    Pair(position, value)
.subscribe(values)

然后我想做的是根据它们在对中的位置对值进行分组,并获得一个Observable&lt;List&lt;Value&gt;&gt;,其中列表包含每个位置的最新值。但是我不知道在使用groupBy对它们进行分组后如何进行:

values.groupBy 
    it.first

这会产生Observable&lt;GroupedObservable&lt;Pair, Value&gt;&gt;&gt;。最后这就是我认为我应该如何到达Observable&lt;List&lt;Value&gt;&gt;,但我不知道从这里做什么。

【问题讨论】:

【参考方案1】:

groupBy 在这里似乎对我没有帮助。

累计值一般使用scan完成,这里可以使用如下转换:

values.scanWith( arrayOfNulls<Value>(N) )  acc, (index, value) ->
    acc.copyOf().apply 
        set(index, value)
    

    .map  it.filterNotNull() 

如果没有copyOf(),你可能会侥幸逃脱,但我想我过去在累加器函数不纯时遇到了问题。

顺便说一句,你可以写position to value而不是Pair(position, value)

此外,您可以使用merge 获取Observable&lt;Pair&lt;Int, Value&gt;&gt;,而不是创建BehaviorSubject 并手动订阅所有字段:

Observable.mergeArray(
    fieldX.map  0 to it ,
    fieldY.map  1 to it ,
    fieldZ.map  2 to it 
    // ...
)

总的来说,您可以拥有一个为您完成所有工作的函数:

inline fun <reified T : Any> accumulateLatest(vararg sources: Observable<out T>): Observable<List<T>> 
    return Observable.merge(sources.mapIndexed  index, observable ->
        observable.map  index to it 
    )
        .scanWith( arrayOfNulls<T>(sources.size) )  acc, (index, value) ->
            acc.copyOf().apply 
                set(index, value)
            
        
        .map  it.filterNotNull() 

然后只需调用:

accumulateLatest(fieldX, fieldY, fieldZ)
    .subscribe 
        println("Latest list: $it")
    

【讨论】:

非常感谢。我不知道 ReactiveX 中的扫描功能。我理解您为什么要保持它的纯净,并感谢 Pair 的提示! 没问题!我已经更新了答案,因为整个功能可以封装在一个函数中(如果这符合您的目的)。

以上是关于按需组合嵌套 Observable 的最新值的主要内容,如果未能解决你的问题,请参考以下文章

“MonoTypeOperatorFunction<any>”类型的参数不可分配给“UnaryFunction<Obs​​ervable<any>, Observable

如何从 Observable Array 嵌套对象中获取值

如何延迟创建可观察对象

Angular项目中的Rxjs websockets

一个一个可观察的 RxJS

RxJS之组合操作符 ( Angular环境 )