RxJava之错误处理

Posted 行云间

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RxJava之错误处理相关的知识,希望对你有一定的参考价值。

在Observable发射数据时,有时发送onError通知,导致观察者不能正常接收数据。可是,有时我们希望对Observable发射的onError通知做出响应或者从错误中恢复。此时我们该如何处理呢?下面我们了解下RxJava错误处理相关的操作符。

catch

流程图

这里写图片描述

概述

catch操作符拦截原Observable的onError通知,将它替换为其它的数据项或数据序列,让产生的Observable能够正常终止或者根本不终止。

在RxJava中,catch实现为三个不同的操作符:

  • onErrorReturn:让Observable遇到错误时发射一个特殊的项并且正常终止。
  • onErrorResumeNext:让Observable在遇到错误时开始发射第二个Observable的数据序列。
  • onExceptionResumeNext:让Observable在遇到错误时继续发射后面的数据项。

onErrorReturn

流程图

这里写图片描述

概述

onErrorReturn方法创建并返回一个拥有类似原Observable的新Observable,后者会忽略前者的onError调用,不会将错误传递给观察者,作为替代,它会通过参数函数,创建一个特殊项并发发射,最后调用观察者的onCompleted方法。

API

Javadoc: onErrorReturn(Func1))

示例代码

Observable.create(new Observable.OnSubscribe<Student>() {
    @Override
    public void call(Subscriber<? super Student> subscriber) {
        subscriber.onNext(getListOfStudent().get(0));
        subscriber.onNext(getListOfStudent().get(1));
        subscriber.onNext(getListOfStudent().get(2));
        subscriber.onError(new Throwable("do onError"));
        subscriber.onNext(getListOfStudent().get(3));
        subscriber.onNext(getListOfStudent().get(4));
        subscriber.onNext(getListOfStudent().get(5));
    }
}).subscribeOn(Schedulers.io())
        .onErrorReturn(new Func1<Throwable, Student>() {
            @Override
            public Student call(Throwable throwable) {
                return new Student(1001, "error - 1 ", 10);
            }
        }).observeOn(androidSchedulers.mainThread())
        .subscribe(new Subscriber<Student>() {

            @Override
            public void onStart() {
                super.onStart();
                mAdaStudent.clear();
            }

            @Override
            public void onCompleted() {
                Log.i(TAG, "do onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                Log.i(TAG, "do onError");
            }

            @Override
            public void onNext(Student student) {
                Log.i(TAG, "do onNext");
                mAdaStudent.addData(student);
            }
        });

Log打印

OperateActivity: do onNext
Student{id='1'name='A', age=23}
OperateActivity: do onNext
Student{id='2'name='B', age=33}
OperateActivity: do onNext
Student{id='3'name='C', age=24}
OperateActivity: do onNext
Student{id='1001'name='error - 1 ', age=10}
OperateActivity: do onCompleted

示例解析

在手动创建Observale时,当Observable发送了第三个数据后,Observable发送了onError通知,然后又发送了2个数据。而在onErrorReturn方法处理中,其参数函数中,创建并返回了一个特殊项( new Student(1001, “error - 1 “, 10)).

从Log打印可以看出,观察者并没有执行onError方法,意味着Observale并没有接收到onError通知,而是接收到了一个特殊项后,调用了onCompleted方法,结束了此次订阅。而这个特殊项,正是在onErrorReturn中参数函数中,创建的特殊项。

onErrorResumeNext

流程图

这里写图片描述

概述

onErrorResumeNext方法创建并返回一个拥有类似原Observable的新Observable,后者会忽略前者的onError调用,不会将onError通知传递给观察者,但作为替代,=新的Observable开始发射数据。

onErrorResumeNext方法与onErrorReturn()方法类似,都是拦截原Observable的onError通知,不同的是拦截后的处理方式,onErrorReturn创建并返回一个特殊项,而onErrorResumeNext创建并返回一个新的Observabl,观察者会订阅它,并接收其发射的数据。

API

Javadoc: onErrorResumeNext(Func1))
Javadoc: onErrorResumeNext(Observable))

示例代码

Observable.create(new Observable.OnSubscribe<Student>() {
    @Override
    public void call(Subscriber<? super Student> subscriber) {
        subscriber.onNext(getListOfStudent().get(0));
        subscriber.onNext(getListOfStudent().get(1));
        subscriber.onNext(getListOfStudent().get(2));
        subscriber.onError(new Throwable("do onError"));
        subscriber.onNext(getListOfStudent().get(3));
        subscriber.onNext(getListOfStudent().get(4));
        subscriber.onNext(getListOfStudent().get(5));
    }
}).subscribeOn(Schedulers.io())
        .onErrorResumeNext(new Func1<Throwable, Observable<Student>>() {
            @Override
            public Observable<Student> call(Throwable throwable) {
                return Observable.just(new Student(1001, "error - 1 ", 10), new Student(1002, "error - 2 ", 10));
            }
        }).observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Subscriber<Student>() {

            @Override
            public void onStart() {
                super.onStart();
                mAdaStudent.clear();
            }

            @Override
            public void onCompleted() {
                Log.i(TAG, "do onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                Log.i(TAG, "do onError");
            }

            @Override
            public void onNext(Student student) {
                Log.i(TAG, "do onNext");
                Log.i(TAG, student.toString());
                mAdaStudent.addData(student);
            }
            });

Log打印

OperateActivity: do onNext
Student{id='1'name='A', age=23}
OperateActivity: do onNext
Student{id='2'name='B', age=33}
OperateActivity: do onNext
Student{id='3'name='C', age=24}
OperateActivity: do onNext
Student{id='1001'name='error - 1 ', age=10}
OperateActivity: do onNext
Student{id='1002'name='error - 2 ', age=10}
OperateActivity: do onCompleted

示例解析

在手动创建Observale时,当Observable发送了第三个数据后,Observable发送了onError通知,然后又发送了2个数据。在onErrorResumeNext方法中的参数函数中,创建了一个新的Observable。

从Log打印可以看出,观察者并没有执行onError方法,意味着Observale并没有接收到onError通知,而是接收到了新建的创建了一个新的Observable发射的出具。在新Observable发射完数据后,调用了onCompleted方法,结束了此次订阅。

onExceptionResumeNext

流程图

这里写图片描述

概述

onExceptionResumeNext方法与onErrorResumeNext方法类似创建并返回一个拥有类似原Observable的新Observable,,也使用这个备用的Observable。不同的是,如果onError收到的Throwable不是一个Exception,它会将错误传递给观察者的onError方法,不会使用备用的Observable。

API

Javadoc: onExceptionResumeNext(Observable))

示例代码

Observable.create(new Observable.OnSubscribe<Student>() {
    @Override
    public void call(Subscriber<? super Student> subscriber) {
        subscriber.onNext(getListOfStudent().get(0));
        subscriber.onNext(getListOfStudent().get(1));
        subscriber.onNext(getListOfStudent().get(2));
        subscriber.onError(new Throwable("do onError"));
        subscriber.onNext(getListOfStudent().get(3));
        subscriber.onNext(getListOfStudent().get(4));
        subscriber.onNext(getListOfStudent().get(5));
    }
}).subscribeOn(Schedulers.io())
        .onExceptionResumeNext(Observable.just(new Student(1001, "error - 1 ", 10),
                new Student(1002, "error - 2 ", 10)))
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Subscriber<Student>() {

            @Override
            public void onStart() {
                super.onStart();
                mAdaStudent.clear();
            }

            @Override
            public void onCompleted() {
                Log.i(TAG, "do onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                Log.i(TAG, "do onError");
            }

            @Override
            public void onNext(Student student) {
                Log.i(TAG, "do onNext");
                Log.i(TAG, student.toString());
                mAdaStudent.addData(student);
            }
        });

 Observable.create(new Observable.OnSubscribe<Student>() {
        @Override
        public void call(Subscriber<? super Student> subscriber) {
            subscriber.onNext(getListOfStudent().get(0));
            subscriber.onNext(getListOfStudent().get(1));
            subscriber.onNext(getListOfStudent().get(2));
            subscriber.onError(new Exception("do onError"));
            subscriber.onNext(getListOfStudent().get(3));
            subscriber.onNext(getListOfStudent().get(4));
            subscriber.onNext(getListOfStudent().get(5));
        }
    }) ***

Log打印

1.
do onError  
2.
OperateActivity: do onNext
Student{id='1'name='A', age=23}
OperateActivity: do onNext
Student{id='2'name='B', age=33}
OperateActivity: do onNext
Student{id='3'name='C', age=24}
OperateActivity: do onNext
Student{id='1001'name='error - 1 ', age=10}
OperateActivity: do onNext
Student{id='1002'name='error - 2 ', age=10}
OperateActivity: do onCompleted

示例解析

在创建Observale发送OnError通知时,error采用了两种方式,一个是Throwable,另外一个是Exception。从打印的Log中可以看出,在采用第一种方式时,原Observable直接发送了onError通知,并结束发射。但是采用发射Exception作为onError通知时,原Observale的onError通知被拦截,并使用了onExceptionResumeNext()创建的备用Observale。正如概述中叙述的,onExceptionResumeNext方法至拦截原Observale中Exception作为onError的通知,并将在参数函数中创建的备用Observable中的数据发射出去。

retry

流程图

这里写图片描述

概述

retry()操作符将拦截原Observable传递onError给观察者,而是重新订阅此Observable。由于是重新订阅会造成数据重复。

在RxJava中,retry()操作符有几个变体

retry()变体在出现onError通知时,将无限的重新订阅原Observable.

retry(long)变体通过参数指定最多重新订阅的次数,如果次数超了,它不会尝试再次订阅,它会把最新的一个onError通知传递给它的观察者。

retry(Func2)变体通过参数接受两个参数的函数,参数为重试次数和导致发射onError通知的Throwable,而函数返回一个布尔值,如果返回true,retry应该再次订阅原Observable,如果返回false,retry会将最新的一个onError通知传递给它的观察者。

API

Javadoc: retry()
Javadoc: retry(long)
Javadoc: retry(Func2)

示例代码

Observable.create(new Observable.OnSubscribe<Student>() {
    @Override
    public void call(Subscriber<? super Student> subscriber) {
        subscriber.onNext(getListOfStudent().get(0));
        subscriber.onNext(getListOfStudent().get(1));
        subscriber.onNext(getListOfStudent().get(2));
        if (isError) {
            subscriber.onError(new Throwable("do onError"));
            isError = false;
        }

        subscriber.onNext(getListOfStudent().get(3));
        subscriber.onNext(getListOfStudent().get(4));
        subscriber.onNext(getListOfStudent().get(5));
    }
}).retry(3)
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Subscriber<Student>() {

            @Override
            public void onStart() {
                super.onStart();
                mAdaStudent.clear();
            }

            @Override
            public void onCompleted() {
                Log.i(TAG, "do onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                Log.i(TAG, "do onError");
            }

            @Override
            public void onNext(Student student) {
                Log.i(TAG, "do onNext");
                Log.i(TAG, student.toString());
                mAdaStudent.addData(student);
            }
        });

Log打印

OperateActivity: do onNext
Student{id='1'name='A', age=23}
OperateActivity: do onNext
Student{id='2'name='B', age=33}
OperateActivity: do onNext
Student{id='3'name='C', age=24}
OperateActivity: do onNext
Student{id='1'name='A', age=23}
OperateActivity: do onNext
Student{id='2'name='B', age=33}
OperateActivity: do onNext
Student{id='3'name='C', age=24}
OperateActivity: do onNext
Student{id='4'name='D', age=24}
OperateActivity: do onNext
Student{id='5'name='E', age=33}
OperateActivity: do onNext
Student{id='6'name='F', age=23}

示例解析

从示例代码中可以看出,第一次订阅时,发射完第三个通知后,发送onError通知。但,通过Log打印可以清晰的看出,onError通知并没有发射出去,而是重新订阅,将之前发射的数据,重新发了一遍。正如之前说的,retry()操作符会拦截onError通知并重新订阅,但是会造成数据的重复。

retryWhen

流程图

这里写图片描述

概述

retryWhen()默认在trampoline调度器上执行,可以通过参数指定其它的调度器。

API

Javadoc: retryWhen(Func1)
Javadoc: retryWhen(Func1,Scheduler)

示例代码

1.
Observable.create(new Observable.OnSubscribe<Student>() {
    @Override
    public void call(Subscriber<? super Student> subscriber) {
        subscriber.onNext(getListOfStudent().get(0));
        subscriber.onNext(getListOfStudent().get(1));
        subscriber.onNext(getListOfStudent().get(2));
        if (isError) {
            subscriber.onError(new Throwable("do onError"));
            isError = false;
        }

        subscriber.onNext(getListOfStudent().get(3));
        subscriber.onNext(getListOfStudent().get(4));
        subscriber.onNext(getListOfStudent().get(5));
    }
}).subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Subscriber<Student>() {

            @Override
            public void onStart() {
                super.onStart();
                mAdaStudent.clear();
            }

            @Override
            public void onCompleted() {
                Log.i(TAG, "do onCompleted");
            }

            @Override
            public void onError(Throwable e) {
                Log.i(TAG, "do onError");
            }

            @Override
            public void onNext(Student student) {
                Log.i(TAG, "do onNext");
                Log.i(TAG, student.toString());
                mAdaStudent.addData(student);
            }
        });
2.
***
.retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {
    @Override
    public Observable<Throwable> call(Observable<? extends Throwable> observable) {
        return Observable.error(new Throwable(" do retryWhen"));
    }
})
****

Log打印

1.
OperateActivity: do onNext
Student{id=’1’name=’A’, age=23}
OperateActivity: do onNext
Student{id=’2’name=’B’, age=33}
OperateActivity: do onNext
Student{id=’3’name=’C’, age=24}
OperateActivity: do onNext
Student{id=’1’name=’A’, age=23}
OperateActivity: do onNext
Student{id=’2’name=’B’, age=33}
OperateActivity: do onNext
Student{id=’3’name=’C’, age=24}
OperateActivity: do onNext
Student{id=’4’name=’D’, age=24}
OperateActivity: do onNext
Student{id=’5’name=’E’, age=33}
OperateActivity: do onNext
Student{id=’6’name=’F’, age=23}
2.
do onError

示例解析

示例1中,在retryWhend(Func1)的参数函数中,创建并返回了一个可发射数据的Observable对象,而在示例2中,其参数函数,创建并返回了一个发射onError通知的Observable。通过Log打印可以出,示例1在拦截了原Observable中的onError通知,并重新订阅了原Observable,但是示例2中,观察者接收了onError通知,意味着原Observable中的onError通知未被拦截,直接发射出去。示例2中,正体现了retryWhen()和retry()的不同之处。

以上是关于RxJava之错误处理的主要内容,如果未能解决你的问题,请参考以下文章

如何取消订阅RxKotlin / RxJava中的Flowable?

RxJava 错误处理

RxJava:如何使用 zip 运算符处理错误?

使用 Kotlin 处理错误 RXJava Android

使用 Zip 运算符、Rxjava 和 Retrofit 处理错误

使用改造对 mvvm 中的 Rxjava 进行错误处理