合并 Observable 列表并等待所有完成
Posted
技术标签:
【中文标题】合并 Observable 列表并等待所有完成【英文标题】:Combine a list of Observables and wait until all completed 【发布时间】:2016-05-23 07:49:55 【问题描述】:TL;DR
如何将Task.whenAll(List<Task>)
转换成RxJava
?
我现有的代码使用 Bolts 构建异步任务列表,并等待所有这些任务完成后再执行其他步骤。本质上,它构建了一个List<Task>
并返回一个Task
,当列表中的所有 任务完成时,它被标记为已完成,根据example on the Bolts site。
我希望用RxJava
替换Bolts
,并且我假设这种建立异步任务列表的方法(事先不知道大小)并将它们全部包装成一个Observable
是可能的,但我不知道如何。
我已经尝试查看merge
、zip
、concat
等...但无法开始使用我将建立的List<Observable>
,因为它们似乎都适合工作如果我对文档的理解正确,一次只写两个Observables
。
我正在努力学习 RxJava
并且对它仍然很陌生,所以如果这是一个明显的问题或在文档中的某个地方进行了解释,请原谅我;我试过搜索。任何帮助将不胜感激。
【问题讨论】:
【参考方案1】:如果您有动态任务组合,您可以使用flatMap
。像这样的:
public Observable<Boolean> whenAll(List<Observable<Boolean>> tasks)
return Observable.from(tasks)
//execute in parallel
.flatMap(task -> task.observeOn(Schedulers.computation()))
//wait, until all task are executed
//be aware, all your observable should emit onComplete event
//otherwise you will wait forever
.toList()
//could implement more intelligent logic. eg. check that everything is successful
.map(results -> true);
Another good example of parallel execution
注意:我真的不知道您对错误处理的要求。例如,如果只有一项任务失败该怎么办。我认为你应该验证这种情况。
【讨论】:
考虑到问题状态为“列表中的所有任务完成时”,这应该是公认的答案。zip
在其中一项任务完成后立即通知完成,因此不适用。
@MyDogTom :你能用 Java7 语法(不是 lambda)版本更新答案吗?
@PoojaGaikwad 使用 lambda 它更具可读性。只需将第一个 lambda 替换为 new Func1<Observable<Boolean>, Observable<Boolean>>()...
,将第二个替换为 new Func1<List<Boolean>, Boolean>()
@soshial RxJava 2 是 RxJava 发生过的最糟糕的事情,是的【参考方案2】:
如果你使用 Project Reactor,你可以使用Mono.when
。
Mono.when(publisher1, publisher2)
.map(i->
System.out.println("everything is done!");
return i;
).block()
【讨论】:
【参考方案3】:我遇到了类似的问题,我需要从 rest 调用中获取搜索项,同时还集成来自 RecentSearchProvider.AUTHORITY 的已保存建议并将它们组合到一个统一列表中。我试图使用 @MyDogTom 解决方案,不幸的是 RxJava 中没有 Observable.from 。经过一番研究,我得到了一个适合我的解决方案。
fun getSearchedResultsSuggestions(context : Context, query : String) : Single<ArrayList<ArrayList<SearchItem>>>
val fetchedItems = ArrayList<Observable<ArrayList<SearchItem>>>(0)
fetchedItems.add(fetchSearchSuggestions(context,query).toObservable())
fetchedItems.add(getSearchResults(query).toObservable())
return Observable.fromArray(fetchedItems)
.flatMapIterable data->data
.flatMap task -> task.observeOn(Schedulers.io())
.toList()
.map ArrayList(it)
我从 observables 数组中创建了一个 observable,其中包含来自 Internet 的建议和结果列表,具体取决于查询。之后,您只需使用 flatMapIterable 处理这些任务并使用 flatmap 运行它们,将结果放入数组中,稍后可以将其提取到回收视图中。
【讨论】:
【参考方案4】:使用 Kotlin
Observable.zip(obs1, obs2, BiFunction t1 : Boolean, t2:Boolean ->
)
设置函数参数的类型很重要,否则会出现编译错误
最后一个参数类型随着参数的数量而变化: 双功能 2 功能 3 为 3 功能 4 对 4 ...
【讨论】:
【参考方案5】:在提出的建议中,zip() 实际上将可观察到的结果相互结合,这可能是也可能不是想要的,但在问题中没有被问到。在这个问题中,所需要的只是执行每个操作,一个接一个或并行(未指定,但链接的 Bolts 示例是关于并行执行)。此外,当任何 observables 完成时, zip() 将立即完成,因此它违反了要求。
对于 Observables 的并行执行,flatMap() presented in the other answer 很好,但 merge() 会更直接。请注意,合并将在任何可观察对象发生错误时退出,如果您宁愿推迟退出直到所有可观察对象完成,您应该查看mergeDelayError()。
对于一个一个,我认为应该使用Observable.concat() static method。它的 javadoc 状态如下:
concat(java.lang.Iterable> 序列) 将一个 Observable 的 Iterable 扁平化为一个 Observable,一个接一个,不交错
如果您不想并行执行,这听起来像是您所追求的。
另外,如果您只对任务的完成感兴趣,而不是返回值,您可能应该查看Completable 而不是Observable。
TLDR:对于任务和 oncompletion 事件完成时的一对一执行,我认为 Completable.concat() 最适合。对于并行执行,Completable.merge() 或 Completable.mergeDelayError() 听起来像是解决方案。前者会在任何可完成程序上出现任何错误时立即停止,后者即使其中一个有错误也会全部执行,然后才报告错误。
【讨论】:
【参考方案6】:我正在使用 JavaRx Observables 和 RxKotlin 在 Kotlin 中编写一些计算量大的代码。我想观察一个待完成的观察列表,同时给我一个关于进度和最新结果的更新。最后返回最佳计算结果。一个额外的要求是并行运行 Observables 以使用我所有的 cpu 内核。我最终得到了这个解决方案:
@Volatile var results: MutableList<CalculationResult> = mutableListOf()
fun doALotOfCalculations(listOfCalculations: List<Calculation>): Observable<Pair<String, CalculationResult>>
return Observable.create subscriber ->
Observable.concatEager(listOfCalculations.map calculation: Calculation ->
doCalculation(calculation).subscribeOn(Schedulers.computation()) // function doCalculation returns an Observable with only one result
).subscribeBy(
onNext =
results.add(it)
subscriber.onNext(Pair("A calculation is ready", it))
,
onComplete =
subscriber.onNext(Pair("Finished: $results.size", findBestCalculation(results))
subscriber.onComplete()
,
onError =
subscriber.onError(it)
)
【讨论】:
不熟悉 RxKotlin 或@Volatile
,但是如果它被多个线程同时调用,这将如何工作?结果会怎样?【参考方案7】:
听起来您正在寻找Zip operator。
有几种不同的使用方式,让我们看一个例子。假设我们有一些不同类型的简单可观察对象:
Observable<Integer> obs1 = Observable.just(1);
Observable<String> obs2 = Observable.just("Blah");
Observable<Boolean> obs3 = Observable.just(true);
等待它们的最简单方法是这样的:
Observable.zip(obs1, obs2, obs3, (Integer i, String s, Boolean b) -> i + " " + s + " " + b)
.subscribe(str -> System.out.println(str));
请注意,在 zip 函数中,参数具有对应于被压缩的 observables 类型的具体类型。
也可以直接压缩 observables 列表:
List<Observable<?>> obsList = Arrays.asList(obs1, obs2, obs3);
Observable.zip(obsList, (i) -> i[0] + " " + i[1] + " " + i[2])
.subscribe(str -> System.out.println(str));
...或将列表包装成Observable<Observable<?>>
:
Observable<Observable<?>> obsObs = Observable.from(obsList);
Observable.zip(obsObs, (i) -> i[0] + " " + i[1] + " " + i[2])
.subscribe(str -> System.out.println(str));
然而,在这两种情况下,zip 函数只能接受一个Object[]
参数,因为事先不知道列表中可观察对象的类型及其数量。这意味着 zip 函数必须检查参数的数量并相应地转换它们。
不管怎样,以上所有的例子最终都会打印出1 Blah true
编辑: 使用 Zip 时,确保被压缩的 Observables
都发出相同数量的项目。在上面的例子中,所有三个 observables 都发出了一个 item。如果我们把它们改成这样:
Observable<Integer> obs1 = Observable.from(new Integer[]1,2,3); //Emits three items
Observable<String> obs2 = Observable.from(new String[]"Blah","Hello"); //Emits two items
Observable<Boolean> obs3 = Observable.from(new Boolean[]true,true); //Emits two items
那么1, Blah, True
和2, Hello, True
将是传递给 zip 函数的唯一项目。项目 3
将永远不会被压缩,因为其他 observables 已经完成。
【讨论】:
如果其中一个调用失败,这将不起作用。在这种情况下,所有呼叫都将丢失。 @StarWind0 你可以使用onErrorResumeNext
跳过错误,例如:Observable.zip(ob1, ob2........).onErrorResumeNext(Observable.<String>empty())
如果我有 100 个 observables 怎么办?
处理错误这里最好的方法是什么【参考方案8】:
您可能看过zip
运算符,它适用于 2 个 Observable。
还有静态方法Observable.zip
。它有一种对你有用的形式:
zip(java.lang.Iterable<? extends Observable<?>> ws, FuncN<? extends R> zipFunction)
您可以查看javadoc for more.
【讨论】:
以上是关于合并 Observable 列表并等待所有完成的主要内容,如果未能解决你的问题,请参考以下文章
TypeScript - 等待 observable/promise 完成,然后返回 observable