RxJava之错误处理
Posted 行云间
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RxJava之错误处理相关的知识,希望对你有一定的参考价值。
在Observable发射数据时,有时发送onError通知,导致观察者不能正常接收数据。可是,有时我们希望对Observable发射的onError通知做出响应或者从错误中恢复。此时我们该如何处理呢?下面我们了解下RxJava错误处理相关的操作符。
catch
流程图
概述
catch操作符拦截原Observable的onError通知,将它替换为其它的数据项或数据序列,让产生的Observable能够正常终止或者根本不终止。
在RxJava中,catch实现为三个不同的操作符:
- onErrorReturn:让Observable遇到错误时发射一个特殊的项并且正常终止。
- onErrorResumeNext:让Observable在遇到错误时开始发射第二个Observable的数据序列。
- onExceptionResumeNext:让Observable在遇到错误时继续发射后面的数据项。
onErrorReturn
流程图
概述
onErrorReturn方法创建并返回一个拥有类似原Observable的新Observable,后者会忽略前者的onError调用,不会将错误传递给观察者,作为替代,它会通过参数函数,创建一个特殊项并发发射,最后调用观察者的onCompleted方法。
API
Javadoc: onErrorReturn(Func1))
示例代码
Observable.create(new Observable.OnSubscribe<Student>() {
@Override
public void call(Subscriber<? super Student> subscriber) {
subscriber.onNext(getListOfStudent().get(0));
subscriber.onNext(getListOfStudent().get(1));
subscriber.onNext(getListOfStudent().get(2));
subscriber.onError(new Throwable("do onError"));
subscriber.onNext(getListOfStudent().get(3));
subscriber.onNext(getListOfStudent().get(4));
subscriber.onNext(getListOfStudent().get(5));
}
}).subscribeOn(Schedulers.io())
.onErrorReturn(new Func1<Throwable, Student>() {
@Override
public Student call(Throwable throwable) {
return new Student(1001, "error - 1 ", 10);
}
}).observeOn(androidSchedulers.mainThread())
.subscribe(new Subscriber<Student>() {
@Override
public void onStart() {
super.onStart();
mAdaStudent.clear();
}
@Override
public void onCompleted() {
Log.i(TAG, "do onCompleted");
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "do onError");
}
@Override
public void onNext(Student student) {
Log.i(TAG, "do onNext");
mAdaStudent.addData(student);
}
});
Log打印
OperateActivity: do onNext
Student{id='1'name='A', age=23}
OperateActivity: do onNext
Student{id='2'name='B', age=33}
OperateActivity: do onNext
Student{id='3'name='C', age=24}
OperateActivity: do onNext
Student{id='1001'name='error - 1 ', age=10}
OperateActivity: do onCompleted
示例解析
在手动创建Observale时,当Observable发送了第三个数据后,Observable发送了onError通知,然后又发送了2个数据。而在onErrorReturn方法处理中,其参数函数中,创建并返回了一个特殊项( new Student(1001, “error - 1 “, 10)).
从Log打印可以看出,观察者并没有执行onError方法,意味着Observale并没有接收到onError通知,而是接收到了一个特殊项后,调用了onCompleted方法,结束了此次订阅。而这个特殊项,正是在onErrorReturn中参数函数中,创建的特殊项。
onErrorResumeNext
流程图
概述
onErrorResumeNext方法创建并返回一个拥有类似原Observable的新Observable,后者会忽略前者的onError调用,不会将onError通知传递给观察者,但作为替代,=新的Observable开始发射数据。
onErrorResumeNext方法与onErrorReturn()方法类似,都是拦截原Observable的onError通知,不同的是拦截后的处理方式,onErrorReturn创建并返回一个特殊项,而onErrorResumeNext创建并返回一个新的Observabl,观察者会订阅它,并接收其发射的数据。
API
Javadoc: onErrorResumeNext(Func1))
Javadoc: onErrorResumeNext(Observable))
示例代码
Observable.create(new Observable.OnSubscribe<Student>() {
@Override
public void call(Subscriber<? super Student> subscriber) {
subscriber.onNext(getListOfStudent().get(0));
subscriber.onNext(getListOfStudent().get(1));
subscriber.onNext(getListOfStudent().get(2));
subscriber.onError(new Throwable("do onError"));
subscriber.onNext(getListOfStudent().get(3));
subscriber.onNext(getListOfStudent().get(4));
subscriber.onNext(getListOfStudent().get(5));
}
}).subscribeOn(Schedulers.io())
.onErrorResumeNext(new Func1<Throwable, Observable<Student>>() {
@Override
public Observable<Student> call(Throwable throwable) {
return Observable.just(new Student(1001, "error - 1 ", 10), new Student(1002, "error - 2 ", 10));
}
}).observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Student>() {
@Override
public void onStart() {
super.onStart();
mAdaStudent.clear();
}
@Override
public void onCompleted() {
Log.i(TAG, "do onCompleted");
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "do onError");
}
@Override
public void onNext(Student student) {
Log.i(TAG, "do onNext");
Log.i(TAG, student.toString());
mAdaStudent.addData(student);
}
});
Log打印
OperateActivity: do onNext
Student{id='1'name='A', age=23}
OperateActivity: do onNext
Student{id='2'name='B', age=33}
OperateActivity: do onNext
Student{id='3'name='C', age=24}
OperateActivity: do onNext
Student{id='1001'name='error - 1 ', age=10}
OperateActivity: do onNext
Student{id='1002'name='error - 2 ', age=10}
OperateActivity: do onCompleted
示例解析
在手动创建Observale时,当Observable发送了第三个数据后,Observable发送了onError通知,然后又发送了2个数据。在onErrorResumeNext方法中的参数函数中,创建了一个新的Observable。
从Log打印可以看出,观察者并没有执行onError方法,意味着Observale并没有接收到onError通知,而是接收到了新建的创建了一个新的Observable发射的出具。在新Observable发射完数据后,调用了onCompleted方法,结束了此次订阅。
onExceptionResumeNext
流程图
概述
onExceptionResumeNext方法与onErrorResumeNext方法类似创建并返回一个拥有类似原Observable的新Observable,,也使用这个备用的Observable。不同的是,如果onError收到的Throwable不是一个Exception,它会将错误传递给观察者的onError方法,不会使用备用的Observable。
API
Javadoc: onExceptionResumeNext(Observable))
示例代码
Observable.create(new Observable.OnSubscribe<Student>() {
@Override
public void call(Subscriber<? super Student> subscriber) {
subscriber.onNext(getListOfStudent().get(0));
subscriber.onNext(getListOfStudent().get(1));
subscriber.onNext(getListOfStudent().get(2));
subscriber.onError(new Throwable("do onError"));
subscriber.onNext(getListOfStudent().get(3));
subscriber.onNext(getListOfStudent().get(4));
subscriber.onNext(getListOfStudent().get(5));
}
}).subscribeOn(Schedulers.io())
.onExceptionResumeNext(Observable.just(new Student(1001, "error - 1 ", 10),
new Student(1002, "error - 2 ", 10)))
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Student>() {
@Override
public void onStart() {
super.onStart();
mAdaStudent.clear();
}
@Override
public void onCompleted() {
Log.i(TAG, "do onCompleted");
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "do onError");
}
@Override
public void onNext(Student student) {
Log.i(TAG, "do onNext");
Log.i(TAG, student.toString());
mAdaStudent.addData(student);
}
});
Observable.create(new Observable.OnSubscribe<Student>() {
@Override
public void call(Subscriber<? super Student> subscriber) {
subscriber.onNext(getListOfStudent().get(0));
subscriber.onNext(getListOfStudent().get(1));
subscriber.onNext(getListOfStudent().get(2));
subscriber.onError(new Exception("do onError"));
subscriber.onNext(getListOfStudent().get(3));
subscriber.onNext(getListOfStudent().get(4));
subscriber.onNext(getListOfStudent().get(5));
}
}) ***
Log打印
1.
do onError
2.
OperateActivity: do onNext
Student{id='1'name='A', age=23}
OperateActivity: do onNext
Student{id='2'name='B', age=33}
OperateActivity: do onNext
Student{id='3'name='C', age=24}
OperateActivity: do onNext
Student{id='1001'name='error - 1 ', age=10}
OperateActivity: do onNext
Student{id='1002'name='error - 2 ', age=10}
OperateActivity: do onCompleted
示例解析
在创建Observale发送OnError通知时,error采用了两种方式,一个是Throwable,另外一个是Exception。从打印的Log中可以看出,在采用第一种方式时,原Observable直接发送了onError通知,并结束发射。但是采用发射Exception作为onError通知时,原Observale的onError通知被拦截,并使用了onExceptionResumeNext()创建的备用Observale。正如概述中叙述的,onExceptionResumeNext方法至拦截原Observale中Exception作为onError的通知,并将在参数函数中创建的备用Observable中的数据发射出去。
retry
流程图
概述
retry()操作符将拦截原Observable传递onError给观察者,而是重新订阅此Observable。由于是重新订阅会造成数据重复。
在RxJava中,retry()操作符有几个变体
retry()变体在出现onError通知时,将无限的重新订阅原Observable.
retry(long)变体通过参数指定最多重新订阅的次数,如果次数超了,它不会尝试再次订阅,它会把最新的一个onError通知传递给它的观察者。
retry(Func2)变体通过参数接受两个参数的函数,参数为重试次数和导致发射onError通知的Throwable,而函数返回一个布尔值,如果返回true,retry应该再次订阅原Observable,如果返回false,retry会将最新的一个onError通知传递给它的观察者。
API
Javadoc: retry()
Javadoc: retry(long)
Javadoc: retry(Func2)
示例代码
Observable.create(new Observable.OnSubscribe<Student>() {
@Override
public void call(Subscriber<? super Student> subscriber) {
subscriber.onNext(getListOfStudent().get(0));
subscriber.onNext(getListOfStudent().get(1));
subscriber.onNext(getListOfStudent().get(2));
if (isError) {
subscriber.onError(new Throwable("do onError"));
isError = false;
}
subscriber.onNext(getListOfStudent().get(3));
subscriber.onNext(getListOfStudent().get(4));
subscriber.onNext(getListOfStudent().get(5));
}
}).retry(3)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Student>() {
@Override
public void onStart() {
super.onStart();
mAdaStudent.clear();
}
@Override
public void onCompleted() {
Log.i(TAG, "do onCompleted");
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "do onError");
}
@Override
public void onNext(Student student) {
Log.i(TAG, "do onNext");
Log.i(TAG, student.toString());
mAdaStudent.addData(student);
}
});
Log打印
OperateActivity: do onNext
Student{id='1'name='A', age=23}
OperateActivity: do onNext
Student{id='2'name='B', age=33}
OperateActivity: do onNext
Student{id='3'name='C', age=24}
OperateActivity: do onNext
Student{id='1'name='A', age=23}
OperateActivity: do onNext
Student{id='2'name='B', age=33}
OperateActivity: do onNext
Student{id='3'name='C', age=24}
OperateActivity: do onNext
Student{id='4'name='D', age=24}
OperateActivity: do onNext
Student{id='5'name='E', age=33}
OperateActivity: do onNext
Student{id='6'name='F', age=23}
示例解析
从示例代码中可以看出,第一次订阅时,发射完第三个通知后,发送onError通知。但,通过Log打印可以清晰的看出,onError通知并没有发射出去,而是重新订阅,将之前发射的数据,重新发了一遍。正如之前说的,retry()操作符会拦截onError通知并重新订阅,但是会造成数据的重复。
retryWhen
流程图
概述
retryWhen()默认在trampoline调度器上执行,可以通过参数指定其它的调度器。
API
Javadoc: retryWhen(Func1)
Javadoc: retryWhen(Func1,Scheduler)
示例代码
1.
Observable.create(new Observable.OnSubscribe<Student>() {
@Override
public void call(Subscriber<? super Student> subscriber) {
subscriber.onNext(getListOfStudent().get(0));
subscriber.onNext(getListOfStudent().get(1));
subscriber.onNext(getListOfStudent().get(2));
if (isError) {
subscriber.onError(new Throwable("do onError"));
isError = false;
}
subscriber.onNext(getListOfStudent().get(3));
subscriber.onNext(getListOfStudent().get(4));
subscriber.onNext(getListOfStudent().get(5));
}
}).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<Student>() {
@Override
public void onStart() {
super.onStart();
mAdaStudent.clear();
}
@Override
public void onCompleted() {
Log.i(TAG, "do onCompleted");
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "do onError");
}
@Override
public void onNext(Student student) {
Log.i(TAG, "do onNext");
Log.i(TAG, student.toString());
mAdaStudent.addData(student);
}
});
2.
***
.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
@Override
public Observable<Throwable> call(Observable<? extends Throwable> observable) {
return Observable.error(new Throwable(" do retryWhen"));
}
})
****
Log打印
1.
OperateActivity: do onNext
Student{id=’1’name=’A’, age=23}
OperateActivity: do onNext
Student{id=’2’name=’B’, age=33}
OperateActivity: do onNext
Student{id=’3’name=’C’, age=24}
OperateActivity: do onNext
Student{id=’1’name=’A’, age=23}
OperateActivity: do onNext
Student{id=’2’name=’B’, age=33}
OperateActivity: do onNext
Student{id=’3’name=’C’, age=24}
OperateActivity: do onNext
Student{id=’4’name=’D’, age=24}
OperateActivity: do onNext
Student{id=’5’name=’E’, age=33}
OperateActivity: do onNext
Student{id=’6’name=’F’, age=23}
2.
do onError
示例解析
示例1中,在retryWhend(Func1)的参数函数中,创建并返回了一个可发射数据的Observable对象,而在示例2中,其参数函数,创建并返回了一个发射onError通知的Observable。通过Log打印可以出,示例1在拦截了原Observable中的onError通知,并重新订阅了原Observable,但是示例2中,观察者接收了onError通知,意味着原Observable中的onError通知未被拦截,直接发射出去。示例2中,正体现了retryWhen()和retry()的不同之处。
以上是关于RxJava之错误处理的主要内容,如果未能解决你的问题,请参考以下文章
如何取消订阅RxKotlin / RxJava中的Flowable?