如何在 RxJava2 中链接两个 Completable

Posted

技术标签:

【中文标题】如何在 RxJava2 中链接两个 Completable【英文标题】:How to chain two Completable in RxJava2 【发布时间】:2017-07-29 14:10:06 【问题描述】:

我有两个 Completable。我想做以下场景: 如果第一个 Completable 到达 onComplete ,则继续第二个 Completable。最终结果将是第二个 Completable 的 onComplete。

当我有 Single getUserIdAlreadySavedInDevice() 和 Completable login() 时,我就是这样做的:

@Override
public Completable loginUserThatIsAlreadySavedInDevice(String password) 
    return getUserIdAlreadySavedInDevice()
            .flatMapCompletable(s -> login(password, s))


【问题讨论】:

【参考方案1】:

您正在寻找andThen 运营商。

返回一个 Completable,它首先运行这个 Completable,然后运行另一个 Completable。

firstCompletable
    .andThen(secondCompletable)

通常,此运算符是CompletableflatMap 的“替换”:

Completable       andThen(CompletableSource next)
<T> Maybe<T>      andThen(MaybeSource<T> next)
<T> Observable<T> andThen(ObservableSource<T> next)
<T> Flowable<T>   andThen(Publisher<T> next)
<T> Single<T>     andThen(SingleSource<T> next)

【讨论】:

这应该被标记为正确答案。我遇到了与操作员相同的情况,并一直在寻找答案 2 天。我正在寻找您的答案@Maxim Ostrovidov。谢谢! 文档没有说明第二个可完成项是否在第一个错误的情况下被执行。有人知道吗? @BoD 如果流中发生任何错误 - 它会以 onError 事件终止。这是反应流的一般行为。所以,回答你的问题,第二个Completable 不会被执行。 嗯,我不认为andThenflatMapCompletable 的替代品,因为andThen 采用Completable 实例,而flatMapCompletable 采用创建Completable 的函数。前者是急切的,当您调用它时,会创建 Completable,而在后者中,Completable 是在 Completable 完成后懒惰地构造的。对于 Eager 版本,您可以在前者完成之前调用构造 Completable,不是吗? 扎曼尼诺是对的。有了这个,你必须用 Single.fromCallable(() -> eagerCompletable).flatMapCompletable(c -> c) 包装任何渴望的东西。这刚刚咬了我。【参考方案2】:

TL;DR:其他答案错过了一个微妙之处。如果你想要 concat 的等价物,请使用 doThingA().andThen(doThingB()),如果你想要 flatMap 的等价物,请使用 doThingA().andThen(Completable.defer(() -&gt; doThingB())


编辑:更完整的参考

flatMap()merge()的映射版本 concatMap()concat() 的映射版本 对于Completable,您需要defer() 来使函数调用变得惰性,就像在SingleObservable 的映射函数中一样(或者最好让它在您点击订阅之前什么都不会发生 - 这是一个很好的约定遵循并在官方 Rx 库以及我遇到的任何 Rx 扩展中使用,对于高级用户,这仅指冷可完成,但大多数人可以忽略这一点。 concat(a, b)a.andThen(b) 之间的唯一区别是语法

一些例子:

foo(a).andThen(bar(b)) 将:

    致电foo(a) 立即调用bar(b),即使步骤1返回的可完成返回错误 订阅1 返回的任何步骤 然后将订阅bar(b)的结果仅当最后一步成功完成时

foo(a).andThen(Completable.defer(() -&gt; bar(b)) 将:

    致电foo(a) 订阅步骤1的结果 只有当foo(a)返回的可完成函数成功时,才会调用bar(b)

我将省略merge() 的处理,因为它有点复杂,但长话短说,如果你想要“并行性”,那就是调用它。


上述答案有点正确,但我发现它们具有误导性,因为它们忽略了急切评估的微妙之处。

doThingA().andThen(doThingB()) 将立即调用 doThingB(),但仅在 doThingA() 返回的 observable 完成时订阅 doThingB() 返回的 observable。

doThingA().andThen(Completable.defer(() -&gt; doThingB()) 只有在事情 A 完成后才会调用 doThingB()

仅当doThingB() 在订阅事件之前有副作用时,这才重要。例如。 Single.just(sideEffect(1)).toCompletable()

在订阅事件(真正的冷可观察)之前没有副作用的实现可能是Single.just(1).doOnSuccess(i -&gt; sideEffect(i)).toCompletable()

如果这让我很生气,A 是一些验证逻辑,doThingB() 会立即启动异步数据库更新,从而完成 VertX ObservableFuture。这是不好的。可以说doThingB() 应该写成只在订阅时更新数据库,我将在未来尝试以这种方式设计。

【讨论】:

或者只是通过 Observable.create() 或 Observable.fromCallable() 包装了方法 doThingB()。 @Qing 如果 doThingB() 返回一个 observable 或 completable 这些都是有问题的。 Create 是一个较低级别的 API,因此您最终将重新实现 defer()。 Observable.fromCallable() 会导致严重的错误:它将运行可调用对象,然后结果(内部可完成)将被忽略(除非您添加平面图,但这与 defer 的行为相同,但在两个调用中),所以它赢了'不要等待操作完成,任何错误都会丢失。 (编辑:更具建设性的回复)。 @Sparky 嗯就是这样,你有示例代码可以尝试吗,从来没有遇到过,我想,然后会等到内部 observable 完成运行 确实如此;但是 Completable.complete().andThen(Observable.fromCallable(() -> Completable.fromAction(() -> System.out.println("boo"))).subscribe() 不会打印任何内容,因为内部可完成从未订阅过。 下面是一个错误的 doThingB() 示例: Completable doThingB() System.out.println("前半部分工作"); return Completable.fromAction(() -> System.out.println("工作的后半部分"); 。开发人员认为它在成功时打印两行,或者在失败时不打印任何行,因为这是 flatMap 的工作方式。但是 Completable。 complete().andThen(Observable.fromCallable(() -> doThingB())).subscribe() 和 Completable.error(new RuntimeException()).andThen(doThingB()).subscribe() 将打印“前半部分工作”。要重现平面图的行为,您需要使用 Completable.defer(() -> doThingB())。【参考方案3】:

试试

Completable.concat

Returns a Completable which completes only when all sources complete, one after another.

http://reactivex.io/RxJava/javadoc/io/reactivex/Completable.html#concat(java.lang.Iterable)

【讨论】:

Concat 会导致与上述相同的问题。它会立即解决。所以如果你真的依赖你的电话,你应该坚持 doThingA().andThen(Completable.defer(() -> doThingB())。【参考方案4】:

请注意,这里获得更多选票的答案有点误导。看下面的例子,我的想法是展示一些测试场景,并展示带有运算符andThen 的可完成逻辑的行为。

 private fun doThingAFail(): Completable 
        print("Do thingA Called\n")
        return Completable.fromCallable 
            print("calling stream A\n")
            throw(Exception("The excep"))
        
    

    private fun doThingB(): Completable 
        print("Do thingB Called\n")
        return Completable.fromCallable 
            print("calling stream B\n")

        
    

    private fun doThingA(): Completable 
        print("Do thingA Called\n")
        return Completable.fromCallable 
            print("calling stream A\n")
        
    

请注意以下测试:

@Test
fun testCallAPlusB() 
    doThingA().andThen(doThingB())

输出将是:

Do thingA Called
Do thingB Called

此处快速说明:请注意,我们没有在此 sn-p 中订阅这些 Completable。

为了测试:

@Test
fun theTestSubscribe() 
    doThingA().andThen(doThingB()).subscribe()

输出将是:

Do thingA Called
Do thingB Called
calling stream A
calling stream B

最后,如果第一个 Completable 失败,第二个 Completable 将不会被执行。

@Test
fun theTestFailThingA() 
    doThingAFail().andThen(doThingB()).subscribe()

输出将是:

Do thingA Called
Do thingB Called
calling stream A

这里的关键概念是方法内部和可观察对象内部的逻辑不是同时执行的。 一旦我们调用 doThingA()doThingB() 方法,将打印“Do thingA Called”和“Do thingB Called”行。而“调用流A”和 只有当有人订阅 doThingAdoThingB 方法时,才会调用“调用流 B”行。

这里的第二个概念是 andThen 运算符将如何处理错误。在上面的示例中,如果doThingA() completable 以错误结束,流将结束并且不打印“调用流 B”行。

【讨论】:

【参考方案5】:

我遇到了同样的问题,我使用了操作符 .concactWith 来使它工作。 就我而言,我有两种 Completable 类型的乐趣。

fun makeTwoThings(): Completable 
    makeFirstThing().concatWith(makeSecondThing())


fun makeFirstThing(): Completable
     //TODO()


fun makeSecondThing(): Completable
     //TODO()

【讨论】:

以上是关于如何在 RxJava2 中链接两个 Completable的主要内容,如果未能解决你的问题,请参考以下文章

Android 响应式编程 RxJava2 完全解析

如何正确使用 Java 中 Apollo Client 的 RxJava2 库进行同步调用?

使用 RxJava2 和 Retrofit2 时如何访问响应头?

Rxjava2如何简洁地将结果从前一个节点返回到链中的下一个节点

如何将图像放在容器中间,底部有两个按钮

Rxjava2 Observable的条件操作符详解及实例