按需组合嵌套 Observable 的最新值
Posted
技术标签:
【中文标题】按需组合嵌套 Observable 的最新值【英文标题】:Combine the latest values of a nested Observable on demand 【发布时间】:2019-02-26 08:26:07 【问题描述】:我有一些自定义字段,每个字段都有一个可以使用BehaviorSubject<Value>
检索的值。显示的字段基于我从 API 获得的内容,所以最后我有 n 个 BehaviorSubject<Value>
s。我想将这些值组合到一个Observable<List<Value>>
中,其中列表包含来自这些字段的最新值(顺序无关紧要)。但问题是这些字段并非同时可用,因为它们是在 UI 加载时创建的,所以我不能将 Observable.combineLatest
与主题列表一起使用。
我目前所做的是我创建了以下变量:
private val values = BehaviorSubject.create<Pair<Int, Value>>()
我使用这个主题来订阅该领域的所有主题,但首先将主题与他们的位置进行映射并制作一对。
fieldSubject.map
Pair(position, value)
.subscribe(values)
然后我想做的是根据它们在对中的位置对值进行分组,并获得一个Observable<List<Value>>
,其中列表包含每个位置的最新值。但是我不知道在使用groupBy
对它们进行分组后如何进行:
values.groupBy
it.first
这会产生Observable<GroupedObservable<Pair, Value>>>
。最后这就是我认为我应该如何到达Observable<List<Value>>
,但我不知道从这里做什么。
【问题讨论】:
【参考方案1】:groupBy
在这里似乎对我没有帮助。
累计值一般使用scan
完成,这里可以使用如下转换:
values.scanWith( arrayOfNulls<Value>(N) ) acc, (index, value) ->
acc.copyOf().apply
set(index, value)
.map it.filterNotNull()
如果没有copyOf()
,你可能会侥幸逃脱,但我想我过去在累加器函数不纯时遇到了问题。
顺便说一句,你可以写position to value
而不是Pair(position, value)
。
此外,您可以使用merge
获取Observable<Pair<Int, Value>>
,而不是创建BehaviorSubject
并手动订阅所有字段:
Observable.mergeArray(
fieldX.map 0 to it ,
fieldY.map 1 to it ,
fieldZ.map 2 to it
// ...
)
总的来说,您可以拥有一个为您完成所有工作的函数:
inline fun <reified T : Any> accumulateLatest(vararg sources: Observable<out T>): Observable<List<T>>
return Observable.merge(sources.mapIndexed index, observable ->
observable.map index to it
)
.scanWith( arrayOfNulls<T>(sources.size) ) acc, (index, value) ->
acc.copyOf().apply
set(index, value)
.map it.filterNotNull()
然后只需调用:
accumulateLatest(fieldX, fieldY, fieldZ)
.subscribe
println("Latest list: $it")
【讨论】:
非常感谢。我不知道 ReactiveX 中的扫描功能。我理解您为什么要保持它的纯净,并感谢 Pair 的提示! 没问题!我已经更新了答案,因为整个功能可以封装在一个函数中(如果这符合您的目的)。以上是关于按需组合嵌套 Observable 的最新值的主要内容,如果未能解决你的问题,请参考以下文章
“MonoTypeOperatorFunction<any>”类型的参数不可分配给“UnaryFunction<Observable<any>, Observable