合并 Observable 列表并等待所有完成

Posted

技术标签:

【中文标题】合并 Observable 列表并等待所有完成【英文标题】:Combine a list of Observables and wait until all completed 【发布时间】:2016-05-23 07:49:55 【问题描述】:

TL;DR 如何将Task.whenAll(List<Task>)转换成RxJava

我现有的代码使用 Bolts 构建异步任务列表,并等待所有这些任务完成后再执行其他步骤。本质上,它构建了一个List<Task> 并返回一个Task,当列表中的所有 任务完成时,它被标记为已完成,根据example on the Bolts site。

我希望用RxJava 替换Bolts,并且我假设这种建立异步任务列表的方法(事先不知道大小)并将它们全部包装成一个Observable 是可能的,但我不知道如何。

我已经尝试查看mergezipconcat 等...但无法开始使用我将建立的List<Observable>,因为它们似乎都适合工作如果我对文档的理解正确,一次只写两个Observables

我正在努力学习 RxJava 并且对它仍然很陌生,所以如果这是一个明显的问题或在文档中的某个地方进行了解释,请原谅我;我试过搜索。任何帮助将不胜感激。

【问题讨论】:

【参考方案1】:

如果您有动态任务组合,您可以使用flatMap。像这样的:

public Observable<Boolean> whenAll(List<Observable<Boolean>> tasks) 
    return Observable.from(tasks)
            //execute in parallel
            .flatMap(task -> task.observeOn(Schedulers.computation()))
            //wait, until all task are executed
            //be aware, all your observable should emit onComplete event
            //otherwise you will wait forever
            .toList()
            //could implement more intelligent logic. eg. check that everything is successful
            .map(results -> true);

Another good example of parallel execution

注意:我真的不知道您对错误处理的要求。例如,如果只有一项任务失败该怎么办。我认为你应该验证这种情况。

【讨论】:

考虑到问题状态为“列表中的所有任务完成时”,这应该是公认的答案。 zip 在其中一项任务完成后立即通知完成,因此不适用。 @MyDogTom :你能用 Java7 语法(不是 lambda)版本更新答案吗? @PoojaGaikwad 使用 lambda 它更具可读性。只需将第一个 lambda 替换为 new Func1&lt;Observable&lt;Boolean&gt;, Observable&lt;Boolean&gt;&gt;()...,将第二个替换为 new Func1&lt;List&lt;Boolean&gt;, Boolean&gt;() @soshial RxJava 2 是 RxJava 发生过的最糟糕的事情,是的【参考方案2】:

如果你使用 Project Reactor,你可以使用Mono.when

Mono.when(publisher1, publisher2)
.map(i-> 
    System.out.println("everything is done!");
    return i;
).block()

【讨论】:

【参考方案3】:

我遇到了类似的问题,我需要从 rest 调用中获取搜索项,同时还集成来自 RecentSearchProvider.AUTHORITY 的已保存建议并将它们组合到一个统一列表中。我试图使用 @MyDogTom 解决方案,不幸的是 RxJava 中没有 Observable.from 。经过一番研究,我得到了一个适合我的解决方案。

 fun getSearchedResultsSuggestions(context : Context, query : String) : Single<ArrayList<ArrayList<SearchItem>>>

    val fetchedItems = ArrayList<Observable<ArrayList<SearchItem>>>(0)
    fetchedItems.add(fetchSearchSuggestions(context,query).toObservable())
    fetchedItems.add(getSearchResults(query).toObservable())

    return Observable.fromArray(fetchedItems)
        .flatMapIterable  data->data 
        .flatMap task -> task.observeOn(Schedulers.io())
        .toList()
        .map  ArrayList(it) 

我从 observables 数组中创建了一个 observable,其中包含来自 Internet 的建议和结果列表,具体取决于查询。之后,您只需使用 flatMapIterable 处理这些任务并使用 flatmap 运行它们,将结果放入数组中,稍后可以将其提取到回收视图中。

【讨论】:

【参考方案4】:

使用 Kotlin

Observable.zip(obs1, obs2, BiFunction  t1 : Boolean, t2:Boolean ->

)

设置函数参数的类型很重要,否则会出现编译错误

最后一个参数类型随着参数的数量而变化: 双功能 2 功能 3 为 3 功能 4 对 4 ...

【讨论】:

【参考方案5】:

在提出的建议中,zip() 实际上将可观察到的结果相互结合,这可能是也可能不是想要的,但在问题中没有被问到。在这个问题中,所需要的只是执行每个操作,一个接一个或并行(未指定,但链接的 Bolts 示例是关于并行执行)。此外,当任何 observables 完成时, zip() 将立即完成,因此它违反了要求。

对于 Observables 的并行执行,flatMap() presented in the other answer 很好,但 merge() 会更直接。请注意,合并将在任何可观察对象发生错误时退出,如果您宁愿推迟退出直到所有可观察对象完成,您应该查看mergeDelayError()。

对于一个一个,我认为应该使用Observable.concat() static method。它的 javadoc 状态如下:

concat(java.lang.Iterable> 序列) 将一个 Observable 的 Iterable 扁平化为一个 Observable,一个接一个,不交错

如果您不想并行执行,这听起来像是您所追求的。

另外,如果您只对任务的完成感兴趣,而不是返回值,您可能应该查看Completable 而不是Observable。

TLDR:对于任务和 oncompletion 事件完成时的一对一执行,我认为 Completable.concat() 最适合。对于并行执行,Completable.merge() 或 Completable.mergeDelayError() 听起来像是解决方案。前者会在任何可完成程序上出现任何错误时立即停止,后者即使其中一个有错误也会全部执行,然后才报告错误。

【讨论】:

【参考方案6】:

我正在使用 JavaRx Observables 和 RxKotlin 在 Kotlin 中编写一些计算量大的代码。我想观察一个待完成的观察列表,同时给我一个关于进度和最新结果的更新。最后返回最佳计算结果。一个额外的要求是并行运行 Observables 以使用我所有的 cpu 内核。我最终得到了这个解决方案:

@Volatile var results: MutableList<CalculationResult> = mutableListOf()

fun doALotOfCalculations(listOfCalculations: List<Calculation>): Observable<Pair<String, CalculationResult>> 

    return Observable.create  subscriber ->
        Observable.concatEager(listOfCalculations.map  calculation: Calculation ->
            doCalculation(calculation).subscribeOn(Schedulers.computation()) // function doCalculation returns an Observable with only one result
        ).subscribeBy(
            onNext = 
                results.add(it)
                subscriber.onNext(Pair("A calculation is ready", it))

            ,
            onComplete = 
                subscriber.onNext(Pair("Finished: $results.size", findBestCalculation(results)) 
                subscriber.onComplete()
            ,
            onError = 
                subscriber.onError(it)
            
        )
    

【讨论】:

不熟悉 RxKotlin 或 @Volatile,但是如果它被多个线程同时调用,这将如何工作?结果会怎样?【参考方案7】:

听起来您正在寻找Zip operator。

有几种不同的使用方式,让我们看一个例子。假设我们有一些不同类型的简单可观察对象:

Observable<Integer> obs1 = Observable.just(1);
Observable<String> obs2 = Observable.just("Blah");
Observable<Boolean> obs3 = Observable.just(true);

等待它们的最简单方法是这样的:

Observable.zip(obs1, obs2, obs3, (Integer i, String s, Boolean b) -> i + " " + s + " " + b)
.subscribe(str -> System.out.println(str));

请注意,在 zip 函数中,参数具有对应于被压缩的 observables 类型的具体类型。

也可以直接压缩 observables 列表:

List<Observable<?>> obsList = Arrays.asList(obs1, obs2, obs3);

Observable.zip(obsList, (i) -> i[0] + " " + i[1] + " " + i[2])
.subscribe(str -> System.out.println(str));

...或将列表包装成Observable&lt;Observable&lt;?&gt;&gt;:

Observable<Observable<?>> obsObs = Observable.from(obsList);

Observable.zip(obsObs, (i) -> i[0] + " " + i[1] + " " + i[2])
.subscribe(str -> System.out.println(str));

然而,在这两种情况下,zip 函数只能接受一个Object[] 参数,因为事先不知道列表中可观察对象的类型及其数量。这意味着 zip 函数必须检查参数的数量并相应地转换它们。

不管怎样,以上所有的例子最终都会打印出1 Blah true

编辑: 使用 Zip 时,确保被压缩的 Observables 都发出相同数量的项目。在上面的例子中,所有三个 observables 都发出了一个 item。如果我们把它们改成这样:

Observable<Integer> obs1 = Observable.from(new Integer[]1,2,3); //Emits three items
Observable<String> obs2 = Observable.from(new String[]"Blah","Hello"); //Emits two items
Observable<Boolean> obs3 = Observable.from(new Boolean[]true,true); //Emits two items

那么1, Blah, True2, Hello, True 将是传递给 zip 函数的唯一项目。项目 3 将永远不会被压缩,因为其他 observables 已经完成。

【讨论】:

如果其中一个调用失败,这将不起作用。在这种情况下,所有呼叫都将丢失。 @StarWind0 你可以使用onErrorResumeNext跳过错误,例如:Observable.zip(ob1, ob2........).onErrorResumeNext(Observable.&lt;String&gt;empty()) 如果我有 100 个 observables 怎么办? 处理错误这里最好的方法是什么【参考方案8】:

您可能看过zip 运算符,它适用于 2 个 Observable。

还有静态方法Observable.zip。它有一种对你有用的形式:

zip(java.lang.Iterable<? extends Observable<?>> ws, FuncN<? extends R> zipFunction)

您可以查看javadoc for more.

【讨论】:

以上是关于合并 Observable 列表并等待所有完成的主要内容,如果未能解决你的问题,请参考以下文章

javascript 等待合并映射的Observable

Rxjs:将中间订阅和完整的可观察对象合并,并整体完成

TypeScript - 等待 observable/promise 完成,然后返回 observable

如何在订阅前等待 Observable 完成

markdown 等待多个observable完成forkJoin

如何让一个 Observable 序列在发射前等待另一个完成?