RxJava 错误处理

Posted 一叶飘舟

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RxJava 错误处理相关的知识,希望对你有一定的参考价值。

RxJava系列教程:

1. RxJava使用介绍 【视频教程】
2. RxJava操作符
  • Creating Observables(Observable的创建操作符) 【视频教程】
  • Transforming Observables(Observable的转换操作符) 【视频教程】
  • Filtering Observables(Observable的过滤操作符) 【视频教程】
  • Combining Observables(Observable的组合操作符) 【视频教程】
  • Error Handling Operators(Observable的错误处理操作符) 【视频教程】
  • Observable Utility Operators(Observable的辅助性操作符) 【视频教程】
  • Conditional and Boolean Operators(Observable的条件和布尔操作符) 【视频教程】
  • Mathematical and Aggregate Operators(Observable数学运算及聚合操作符) 【视频教程】
  • 其他如observable.toList()、observable.connect()、observable.publish()等等; 【视频教程】
3. RxJava Observer与Subcriber的关系 【视频教程】
4. RxJava线程控制(Scheduler) 【视频教程】
5. RxJava 并发之数据流发射太快如何办(背压(Backpressure)) 【视频教程】


一般来说,Observable不会抛异常。它会调用 onError 终止Observable序列,以此通知所有的观察者发生了一个不可恢复的错误。 但是,也存在一些异常。例如,如果 onError 调用失败了,Observable不会尝试再次调用 onError 去通知观察者,它会抛出 RuntimeException,OnErrorFailedException 或者 OnErrorNotImplementedException。

有时我们希望观察者或者操作符应该对异常发生时的 onError 通知做出合适的响应,而不是捕获(catch)异常。很多操作符可用于对Observable发射的onError通知做出响应或者从错误中恢复,例如,你可以:

  • 吞掉这个错误,切换到一个备用的Observable继续发射数据
  • 吞掉这个错误然后发射默认值
  • 吞掉这个错误并立即尝试重启这个Observable
  • 吞掉这个错误,在一些回退间隔后重启这个Observable

我们可以使用Error handling相关的操作符来集中统一地处理错误。RxJava中错误处理的操作符为 Catch和 Retry。

Catch

  Catch操作符能够拦截原始Observable的onError通知,不让Observable因为产生错误而终止。相当于Java中try/catch操作,不能因为抛异常而导致程序崩溃。
  
这里写图片描述

RxJava将Catch实现为三个不同的操作符:

onErrorReturn:让Observable遇到错误时发射一个特殊的项并且正常终止。
onErrorResumeNext:让Observable在遇到错误时开始发射第二个Observable的数据序列。
onExceptionResumeNext:让Observable在遇到错误时继续发射后面的数据项。

onErrorReturn

这里写图片描述

onErrorReturn方法 返回一个镜像原有Observable行为的新Observable
会忽略前者的onError调用,不会将错误传递给观察者,而是发射一个特殊的项并调用观察者的onCompleted方法。

API

Javadoc: onErrorReturn(Func1))

示例代码

/*
 * onErrorReturn:
 * 返回一个原有Observable行为的新Observable镜像,
 * 后者会忽略前者的onError调用,不会将错误传递给观察者,
 * 作为替代,它会发发射一个特殊的项并调用观察者的onCompleted方法
 */
createObserver()
        //作为替代,它会发发射一个特殊的项并调用观察者的onCompleted方法。
        .onErrorReturn(new Func1<Throwable, String>() {

            @Override
            public String call(Throwable throwable) {

                return "do something";
            }
        })
        .subscribe(new Subscriber<String>() {

            @Override
            public void onCompleted() {
                System.out.println("onCompleted");      
            }

            @Override
            public void onNext(String value) {
                System.out.println("onSuccess value = " + value);
            }

            @Override
            public void onError(Throwable error) {
                System.out.println("onError error = " + error);
            }
        });

这里创建Observable的方法(2个)如下:

private static Observable<String> createObserver() {
        return Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                for (int i = 1; i <= 6; i++) {
                    if (i < 3) {
                        subscriber.onNext(i+"");
                    } else {
                        //会忽略onError调用,不会将错误传递给观察者
                        subscriber.onError(new Throwable("Throw error"));
                    }
                }
            }
        });
    }

    private static Observable<String> createObserver2() {
        return Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                for (int i = 1; i <= 6; i++) {
                    if (i < 3) {
                        subscriber.onNext("onNext:" + i);
                    } else {
                        subscriber.onError(new Exception("the nubmer is greater than 3"));
                        //下面写法也是可以的
                        /*try {
                            throw new Exception("the nubmer is greater than 3");

                        } catch (Exception e) {
                            subscriber.onError(e);
                        }*/
                    }
                }
            }
        });
    }

输出结果如下:

onSuccess value = 1
onSuccess value = 2
onSuccess value = do something
onCompleted

在手动创建Observale时,当Observable发送了第二个数据后,Observable发送了onError通知,然后又发送了1个数据。而在onErrorReturn方法处理中,其参数函数中,创建并返回了一个特殊项( do something).

从Log打印可以看出,观察者并没有执行onError方法,意味着Observale并没有接收到onError通知,而是接收到了一个特殊项后,调用了onCompleted方法,结束了此次订阅。而这个特殊项,正是在onErrorReturn中参数函数中,创建的特殊项。

onErrorResumeNext

这里写图片描述

onErrorResumeNext方法与onErrorReturn()方法类似,都是拦截原Observable的onError通知,不同的是拦截后的处理方式,onErrorReturn创建并返回一个特殊项,而onErrorResumeNext创建并返回一个新的Observabl,观察者会订阅它,并接收其发射的数据。

API

Javadoc: onErrorResumeNext(Func1))
Javadoc: onErrorResumeNext(Observable))

示例代码

createObserver()
        .onErrorResumeNext(new Func1<Throwable, Observable<? extends String>>() {

            @Override
            public Observable<String> call(Throwable t) {
                return Observable.just("a","b","c");
            }
        })
        .subscribe(new Subscriber<String>() {

            @Override
            public void onCompleted() {
                System.out.println("onCompleted");      
            }

            @Override
            public void onNext(String value) {
                System.out.println("onSuccess value = " + value);
            }

            @Override
            public void onError(Throwable error) {
                System.out.println("onError error = " + error);
            }
        });

输出结果如下:

onSuccess value = 1
onSuccess value = 2
onSuccess value = a
onSuccess value = b
onSuccess value = c
onCompleted

在手动创建Observale时,当Observable发送了第二个数据后,Observable发送了onError通知,然后又发送了3个数据。在onErrorResumeNext方法中的参数函数中,创建了一个新的Observable。

从Log打印可以看出,观察者并没有执行onError方法,意味着Observale并没有接收到onError通知,而是接收到了新建的创建了一个新的Observable发射的出具。在新Observable发射完数据后,调用了onCompleted方法,结束了此次订阅。

onExceptionResumeNext

这里写图片描述

onExceptionResumeNext方法与onErrorResumeNext方法类似创建并返回一个拥有类似原Observable的新Observable,,也使用这个备用的Observable。不同的是,如果onError收到的Throwable不是一个Exception,它会将错误传递给观察者的onError方法,不会使用备用的Observable。

这里要普及一个概念,Java的异常分为错误(error)和异常(exception)两种,它们都是继承于Throwable类。

  • 错误(error)一般是比较严重的系统问题,比如我们经常遇到的OutOfMemoryError、StackOverflowError等都是错误。错误一般继承于Error类,而Error类又继承于Throwable类,如果需要捕获错误,需要使用try..catch(Error e)或者try..catch(Throwable e)句式。使用try..catch(Exception e)句式无法捕获错误

  • 异常(Exception)也是继承于Throwable类,一般是根据实际处理业务抛出的异常,分为运行时异常(RuntimeException)和普通异常。普通异常直接继承于Exception类,如果方法内部没有通过try..catch句式进行处理,必须通过throws关键字把异常抛出外部进行处理(即checked异常);而运行时异常继承于RuntimeException类,如果方法内部没有通过try..catch句式进行处理,不需要显式通过throws关键字抛出外部,如IndexOutOfBoundsException、NullPointerException、ClassCastException等都是运行时异常,当然RuntimeException也是继承于Exception类,因此是可以通过try..catch(Exception e)句式进行捕获处理的。

API

Javadoc: onExceptionResumeNext(Observable))

示例代码

/*
 * onExceptionResumeNext:
 * 和onErrorResumeNext类似,可以说是onErrorResumeNext的特例,
 * 区别是如果onError收到的Throwable不是一个Exception,它会将错误传递给观察者的onError方法,不会使用备用的Observable。
 */
createObserver()
        .onExceptionResumeNext(Observable.just("www.stay4it.com"))
        .subscribe(new Subscriber<String>() {

            @Override
            public void onCompleted() {
                System.out.println("onCompleted");      
            }

            @Override
            public void onNext(String value) {
                System.out.println("onSuccess value = " + value);
            }

            @Override
            public void onError(Throwable error) {
                System.out.println("onError error = " + error);
            }
        });

输出结果如下:

onSuccess value = 1
onSuccess value = 2
onError error = java.lang.Throwable: Throw error

从Log打印可以看出,没有使用备用的Observable,这是因为onError收到的Throwable不是一个Exception,所以将错误传递给观察者的onError方法。那么怎么才能使用备用的Observable呢?代码如下:

createObserver2()// 注意这里
        .onExceptionResumeNext(Observable.just("www.stay4it.com"))
        .subscribe(new Subscriber<String>() {

            @Override
            public void onCompleted() {
                System.out.println("onCompleted");      
            }

            @Override
            public void onNext(String value) {
                System.out.println("onSuccess value = " + value);
            }

            @Override
            public void onError(Throwable error) {
                System.out.println("onError error = " + error);
            }
        });

输出结果如下:

onSuccess value = onNext:1
onSuccess value = onNext:2
onSuccess value = www.stay4it.com
onCompleted

Retry

顾名思义,retry的意思就是试着重来,当原始Observable发射onError通知时,retry操作符不会让onError通知传递给观察者,它会重新订阅这个Observable一次或者多次(意味着重新从头发射数据),所以可能造成数据项重复发送的情况。

如果重新订阅了指定的次数还是发射了onError通知,将不再尝试重新订阅,它会把最新的一个onError通知传递给观察者。

这里写图片描述

RxJava中将Retry操作符的实现为retry和retryWhen两种。

retry操作符默认在trampoline调度器上执行。

  • Javadoc: retry():无论收到多少次onError通知,都会继续订阅并重发原始Observable,直到onCompleted。

  • Javadoc: retry(long):接受count参数的retry会最多重新订阅count次,如果次数超过了就不会尝试再次订阅,它会把最新的一个onError通知传递给他的观察者。

  • Javadoc: retry(Func2): 这个版本的retry接受一个谓词函数作为参数,这个函数的两个参数是:重试次数和导致发射onError通知的Throwable。这个函数返回一个布尔值,如果返回true,retry应该再次订阅和镜像原始的Observable,如果返回false,retry会将最新的一个onError通知传递给它的观察者。

API

Javadoc: retry()
Javadoc: retry(long)
Javadoc: retry(Func2)

示例代码

/**
 * retry()
 * 无限次尝试重新订阅
 */
createObserver()
        .retry()
        .subscribe(new Subscriber<String>() {

            @Override
            public void onCompleted() {
                System.out.println("onCompleted");      
            }

            @Override
            public void onNext(String value) {
                System.out.println("onSuccess value = " + value);
            }

            @Override
            public void onError(Throwable error) {
                System.out.println("onError error = " + error);
            }
        });

/**
 * retry(count)
 * 最多2次尝试重新订阅
 */
createObserver()
        .retry(2)
        .subscribe(new Subscriber<String>() {

            @Override
            public void onCompleted() {
                System.out.println("onCompleted");      
            }

            @Override
            public void onNext(String value) {
                System.out.println("onSuccess value = " + value);
            }

            @Override
            public void onError(Throwable error) {
                System.out.println("onError error = " + error);
            }
        });

/**
 * retry(Func2)
 */
createObserver()
        .retry(new Func2<Integer, Throwable, Boolean>() {

            @Override
            public Boolean call(Integer t1, Throwable throwable) {
                System.out.println("发生错误了:"+throwable.getMessage()+",第"+t1+"次重新订阅");    
                if(t1>2){
                    return false;//不再重新订阅
                }
                //此处也可以通过判断throwable来控制不同的错误不同处理
                return true;
            }
        })
        .subscribe(new Subscriber<String>() {

            @Override
            public void onCompleted() {
                System.out.println("onCompleted");      
            }

            @Override
            public void onNext(String value) {
                System.out.println("onSuccess value = " + value);
            }

            @Override
            public void onError(Throwable error) {
                System.out.println("onError error = " + error);
            }
        });

输出结果如下:

onSuccess value = 1
onSuccess value = 2
onSuccess value = 1
onSuccess value = 2
…无限次



onSuccess value = 1
onSuccess value = 2
onSuccess value = 1
onSuccess value = 2
onSuccess value = 1
onSuccess value = 2
onError error = java.lang.Throwable: Throw error



onSuccess value = 1
onSuccess value = 2
发生错误了:Throw error,第1次重新订阅
onSuccess value = 1
onSuccess value = 2
发生错误了:Throw error,第2次重新订阅
onSuccess value = 1
onSuccess value = 2
发生错误了:Throw error,第3次重新订阅
onError error = java.lang.Throwable: Throw error

retryWhen

这里写图片描述

retryWhen和retry类似,区别是,retryWhen将onError中的Throwable传递给一个函数,这个函数产生另一个Observable,retryWhen观察它的结果再决定是不是要重新订阅原始的Observable。如果这个Observable发射了一项数据,它就重新订阅,如果这个Observable发射的是onError通知,它就将这个通知传递给观察者然后终止。
  
retryWhen()默认在trampoline调度器上执行,可以通过参数指定其它的调度器。

API

Javadoc: retryWhen(Func1)
Javadoc: retryWhen(Func1,Scheduler)

示例代码

int retryCount = 0;
final int maxRetries = 3;

        Observable.create(new Observable.OnSubscribe<Integer>() {
            @Override
            public void call(Subscriber<? super Integer> subscriber) {
                subscriber.onError(new RuntimeException("always fails"));
            }
        })
        .subscribeOn(Schedulers.immediate())
        .retryWhen(new Func1<Observable<? extends Throwable>, Observable<?>>() {

                    @Override
                    public Observable<?> call(Observable<? extends Throwable> observable) {
                        return observable.flatMap(new Func1<Throwable, Observable<?>>() {
                            @Override
                            public Observable<?> call(Throwable throwable) {
                                if (++retryCount <= maxRetries) {
                                    // When this Observable calls onNext, the original Observable will be retried (i.e. re-subscribed).
                                    System.out.println("get error, it will try after " + 1000 + " millisecond, retry count " + retryCount);
                                    return Observable.timer(1000, TimeUnit.MILLISECONDS);
                                }
                                return Observable.error(throwable);
                            }
                        });
                    }


                })
                .subscribe(new Subscriber<Integer>() {

                    @Override
                    public void onCompleted() {
                        System.out.println("onCompleted");
                    }

                    @Override
                    public void onNext(Integer value) {
                        System.out.println("onSuccess value = " + value);
                    }

                    @Override
                    public void onError(Throwable error) {
                        System.out.println("onError error = " + error);
                    }
                });

输出结果如下:

get error, it will try after 1000 millisecond, retry count 1
get error, it will try after 1000 millisecond, retry count 2
get error, it will try after 1000 millisecond, retry count 3
onError error = java.lang.RuntimeException: always fails

以上是关于RxJava 错误处理的主要内容,如果未能解决你的问题,请参考以下文章

RxJava:如何使用 zip 运算符处理错误?

RxJava之错误处理

使用 Kotlin 处理错误 RXJava Android

Android :RxJava学习笔记之 错误处理

使用 Zip 运算符、Rxjava 和 Retrofit 处理错误

使用改造对 mvvm 中的 Rxjava 进行错误处理