RxJava:如何使用 zip 运算符处理错误?
Posted
技术标签:
【中文标题】RxJava:如何使用 zip 运算符处理错误?【英文标题】:RxJava : How to handle error with zip operator ? 【发布时间】:2017-05-11 14:28:32 【问题描述】:我正在使用带有 Retrofit2 的 RxJava 和 Rxandroid。
Observable<ResponseOne> responseOneObservable = getRetrofitClient().getDataOne()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
Observable<ResponseTwo> responseTwoObservable = getRetrofitClient().getDataTwo()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
在上面的两个观察者上使用如下的 zip 操作符。
Observable<ArrayList<TestData>> testDataObservable = Observable.zip(responseOneObservable, responseTwoObservable, new Func2<ResponseOne, ResponseTwo, ArrayList<TestData>>()
@Override
public ArrayList<TestData> call(ResponseOne responseOne, ResponseTwo responseTwo)
ArrayList<TestData> testDataList = new ArrayList();
// Add test data from response responseOne & responseTwo
return testDataList;
)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<ArrayList<TestData>>()
@Override
public void onNext(ArrayList<TestData> testDataList)
@Override
public void onCompleted()
Log.d(TAG, "onCompleted" );
@Override
public void onError(Throwable t)
Log.d(TAG, "onError Throwable: " + t.toString() );
);
如果在responseOneObservable
和responseTwoObservable
改造http调用过程中出现任何错误,则会直接调用testDataObservable
的订阅者的onError
方法。
我想继续使用 zip 运算符的 call
方法,即使两个 observable 中的任何一个都得到了成功响应。
如何使用zip
操作符处理错误响应?
【问题讨论】:
我相信onErrorResumeNext
应该允许你这样做。
如果其中一个可观察对象返回 NULL,你知道该怎么做吗? ***.com/questions/50334430/…
【参考方案1】:
您可以使用onErrorResumeNext
返回一些 Observable 或使用onErrorReturn
向zip
返回一些默认值,例如:
Observable.zip(
responseOneObservable
.onErrorReturn(new Func1<Throwable, ResponseOne>()
@Override
public ResponseOne call(final Throwable throwable)
return new ResponseOne();
),
responseTwoObservable
.onErrorReturn(new Func1<Throwable, ResponseTwo>()
@Override
public ResponseTwo call(final Throwable throwable)
return new ResponseTwo();
),
...
请参阅onError handling 了解更多信息。
更新:对于 RxJava 2.0,您必须使用 Function
而不是 Func1
:
import io.reactivex.functions.Function;
...
Observable.zip(
responseOneObservable
.onErrorReturn(new Function<Throwable, ResponseOne>()
@Override
public ResponseOne apply(@NonNull final Throwable throwable)
return new ResponseOne();
),
responseTwoObservable
.onErrorReturn(new Function<Throwable, ResponseTwo>()
@Override
public ResponseTwo apply(@NonNull final Throwable throwable)
return new ResponseTwo();
),
...
或者使用 lambda:
Observable.zip(
responseOneObservable
.onErrorReturn(throwable -> new ResponseOne()),
responseTwoObservable
.onErrorReturn(throwable -> new ResponseTwo()),
...
或者使用 Kotlin:
Observable.zip(
responseOneObservable
.onErrorReturn ResponseOne() ,
responseTwoObservable
.onErrorReturn ResponseTwo() ,
...
【讨论】:
Func1 在 RxJava 2.0 中已被替换为 Function。参考:github.com/ReactiveX/RxJava/wiki/What's-different-in-2.0【参考方案2】:您可以使用onErrorResumeNext
运算符从两个可观察对象中的任何一个返回默认响应。
Observable<ResponseOne> responseOneObservable = getRetrofitClient().getDataOne()
.onErrorResumeNext(throwable -> /*some default value*/)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
Observable<ResponseTwo> responseTwoObservable = getRetrofitClient().getDataTwo()
.onErrorResumeNext(throwable -> /*some default value*/)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
另见Error handling in RxJava - Dan Lew
【讨论】:
【参考方案3】:您应该在单个压缩的 observable 上使用 onErrorResumeNext
来指示它们在发生错误时发出默认项。
参考Error-Handling-Operators
【讨论】:
以上是关于RxJava:如何使用 zip 运算符处理错误?的主要内容,如果未能解决你的问题,请参考以下文章