RxJava:如何使用 zip 运算符处理错误?

Posted

技术标签:

【中文标题】RxJava:如何使用 zip 运算符处理错误?【英文标题】:RxJava : How to handle error with zip operator ? 【发布时间】:2017-05-11 14:28:32 【问题描述】:

我正在使用带有 Retrofit2 的 RxJava 和 Rxandroid

Observable<ResponseOne> responseOneObservable = getRetrofitClient().getDataOne()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());

Observable<ResponseTwo> responseTwoObservable = getRetrofitClient().getDataTwo()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());

在上面的两个观察者上使用如下的 zip 操作符。

 Observable<ArrayList<TestData>> testDataObservable = Observable.zip(responseOneObservable, responseTwoObservable, new Func2<ResponseOne, ResponseTwo, ArrayList<TestData>>() 
            @Override
                public ArrayList<TestData> call(ResponseOne responseOne, ResponseTwo responseTwo) 
                  ArrayList<TestData> testDataList = new ArrayList();
                      // Add test data from response responseOne & responseTwo
                  return testDataList;
             
    )
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Subscriber<ArrayList<TestData>>() 

        @Override
        public void onNext(ArrayList<TestData> testDataList) 

        

        @Override
        public void onCompleted() 
            Log.d(TAG, "onCompleted" );
        

        @Override
        public void onError(Throwable t) 
            Log.d(TAG, "onError Throwable: " + t.toString() );
        
    );

如果在responseOneObservableresponseTwoObservable改造http调用过程中出现任何错误,则会直接调用testDataObservable的订阅者的onError方法。

我想继续使用 zip 运算符的 call 方法,即使两个 observable 中的任何一个都得到了成功响应。

如何使用zip 操作符处理错误响应?

【问题讨论】:

我相信onErrorResumeNext 应该允许你这样做。 如果其中一个可观察对象返回 NULL,你知道该怎么做吗? ***.com/questions/50334430/… 【参考方案1】:

您可以使用onErrorResumeNext 返回一些 Observable 或使用onErrorReturnzip 返回一些默认值,例如:

Observable.zip(
   responseOneObservable
       .onErrorReturn(new Func1<Throwable, ResponseOne>() 
        @Override
        public ResponseOne call(final Throwable throwable) 
            return new ResponseOne();
        
    ),
   responseTwoObservable
       .onErrorReturn(new Func1<Throwable, ResponseTwo>() 
        @Override
        public ResponseTwo call(final Throwable throwable) 
            return new ResponseTwo();
        
    ),
   ...

请参阅onError handling 了解更多信息。


更新:对于 RxJava 2.0,您必须使用 Function 而不是 Func1

import io.reactivex.functions.Function;
...
Observable.zip(
   responseOneObservable
       .onErrorReturn(new Function<Throwable, ResponseOne>() 
        @Override
        public ResponseOne apply(@NonNull final Throwable throwable) 
            return new ResponseOne();
        
    ),
   responseTwoObservable
       .onErrorReturn(new Function<Throwable, ResponseTwo>() 
        @Override
        public ResponseTwo apply(@NonNull final Throwable throwable) 
            return new ResponseTwo();
        
    ),
   ...

或者使用 lambda:

Observable.zip(
   responseOneObservable
       .onErrorReturn(throwable -> new ResponseOne()),
   responseTwoObservable
       .onErrorReturn(throwable -> new ResponseTwo()),
   ...

或者使用 Kotlin:

Observable.zip(
   responseOneObservable
       .onErrorReturn  ResponseOne() ,
   responseTwoObservable
       .onErrorReturn  ResponseTwo() ,
   ...

【讨论】:

Func1 在 RxJava 2.0 中已被替换为 Function。参考:github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0【参考方案2】:

您可以使用onErrorResumeNext 运算符从两个可观察对象中的任何一个返回默认响应。

Observable<ResponseOne> responseOneObservable = getRetrofitClient().getDataOne()
    .onErrorResumeNext(throwable -> /*some default value*/)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread());

Observable<ResponseTwo> responseTwoObservable = getRetrofitClient().getDataTwo()
    .onErrorResumeNext(throwable -> /*some default value*/)
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread());

另见Error handling in RxJava - Dan Lew

【讨论】:

【参考方案3】:

您应该在单个压缩的 observable 上使用 onErrorResumeNext 来指示它们在发生错误时发出默认项。

参考Error-Handling-Operators

【讨论】:

以上是关于RxJava:如何使用 zip 运算符处理错误?的主要内容,如果未能解决你的问题,请参考以下文章

Rxjava并行执行耗时操作使用zip和merge

Rxjava并行执行耗时操作使用zip和merge

如何在Activity中使用Retrofit和RxJava / RxAndroid处理旋转?

RxJava 错误处理

使用 Kotlin 处理错误 RXJava Android

使用改造对 mvvm 中的 Rxjava 进行错误处理