RxJava操作符系列一(上)

Posted 新东网技术发布

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RxJava操作符系列一(上)相关的知识,希望对你有一定的参考价值。

前言


第一次接触学习RxJava应该是一两个月前的事情了,但其中也是断断续续,最近又再次去学习RxJava,和当初刚接触RxJava完全不是同样的心情,轻松了很多,也感受到了RxJava的魅力,真是不由衷感叹太牛了。目前关于RxJava的文章也很多,个人推荐两篇扔物线的给 android 开发者的 RxJava 详解和大头鬼Bruce的译文 深入浅出RxJava系列。那么这篇文章通过代码介绍RxJava中的操作符,以及操作符的使用。当然操作符较多,准备分几篇文章介绍。如果你想提前学习其他操作符可以去GitHub,欢迎star项目。


RxJava操作符源码传送门

https://github.com/xiehui999/fuseProgram


基础介绍


为了力求有没有RxJava基础都能看懂此文,简单介绍一下RxJava以及一些名词。在RxJava开源的Github上是这样解释的a library for composing asynchronous and event-based programs using observable sequences for the Java VM。无论多么复杂的逻辑,都可以保持整洁的代码格式。


在RxJava中最重要的就是Observable(被观察者),subscribe(订阅),Observer(观察者)或者Subscriber(订阅者),Observable也就是数据(事件)源,Subscriber负责接收以及处理数据(事件)。当然要想实现两者通信,需要有一种机制那就是订阅。Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Observer。


例如张三(观察者)想看某款新闻软件的科技信息(被观察者),由于科技信息是每天推送或者不定时推送,如果张三一直盯着手机屏幕看并且刷新消息是不是又新的信息,显然不现实。这时候就可以通过张三 subscribe(订阅)科技信息,而实现当有新的科技信息时自动给张三推送消息,在这期间,张三并不需要一直盯着屏幕刷新闻。在RxJava代码书写时应该是张三.subscribe(科技新闻)。


在RxJava中,有三个事件回调方法,分别是onNext(),OnError(),onCompleted(),onNext()是最终输出及处理数据的回调,在发射数据过程中出现错误异常会回调OnError()方法,当不会再有新的 onNext() 发出时,需要触发 onCompleted() 方法作为标志。,OnError()和onCompleted()是互斥的。下面举一个最简单的例子


Observable observable2 = Observable.just("也许当初忙着微笑和哭泣", "忙着追逐天空中的流星", "人理所当然的忘记", "是谁风里雨里一直默默守护在原地");

        Subscriber subscriber = new Subscriber() {

            @Override

            public void onCompleted() {

                Log.e(TAG, "onCompleted: " )

            }

 

            @Override

            public void onError(Throwable e) {

                Log.e(TAG, "onError: ")

            }

 

            @Override

            public void onNext(String s) {

               Log.e(TAG, "onNext: "+s )

            }

        };

        observable.subscribe(subscriber);


运行后打印信息为


onNext: 也许当初忙着微笑和哭泣

onNext: 忙着追逐天空中的流星

onNext: 人理所当然的忘记

onNext: 是谁风里雨里一直默默守护在原地

onCompleted:


Create


我们可以使用该操作符从零开始创建一个Observable,给这个操作符传递一个接受观察者作为参数的函数,并调用观察者的onNext,onError和onCompleted方法。如下


//被观察者

        Observable observable = Observable.create(new Observable.OnSubscribe() {

            @Override

            public void call(Subscriber super String> subscriber) {

                //可以多次调用subscriber.onNext("大家好")发射数据

                subscriber.onNext("大家好");

                subscriber.onNext("我开始学习RxJava");

                subscriber.onCompleted();

 

            }

        });


发送数据需要在毁掉方法call中调用subscriber的onNext(),onNext(T)发送的参数需要和Observable.OnSubscribe()中参数相同,在上面我们传入的是String类型。创建后Observale后,我们需要创建Subscriber(观察者)去处理observable发送的数据。如下


Subscriber subscriber = new Subscriber() {

            @Override

            public void onCompleted() {

                Log.e(TAG, "onCompleted");

            }

            @Override

            public void onError(Throwable e) {

                Log.e(TAG, e.getMessage());

            }

            @Override

            public void onNext(String s) {

                Log.e(TAG, "onNext:"+s);

            }

        };


数据成功发送后,会回调Subscriber的onNext()的方法,其中的参数就是接收到的数据。当onNext()接收数据完毕后会执行onCompleted(),如果中途有环节出现错误异常,会执行onError()。现在观察者和被观察者都创建完毕了,他们执行还需要一个前提就是订阅,如果不订阅,observable并不会发射数据,subscribe也不会接收数据,订阅代码如下


observable.subscribe(subscriber);


执行后输出信息


onNext:大家好

onNext:我开始学习RxJava

onCompleted


from


该操作符是将其它种类的对象和数据类型转换为Observable,如果当你发射的的数据是同一种类型,而不是混合使用Observables和其它类型的数据,会非常方便。如下创建Observable


Integer[] integers = {1,2, 3, 4};

        Observable observable=Observable.from(integers);

         Subscriber subscriber = new Subscriber() {

            @Override

            public void onCompleted() {

                Log.e(TAG, "onCompleted");

            }

            @Override

            public void onError(Throwable e) {

                Log.e(TAG, e.getMessage());

            }

            @Override

            public void onNext(Integer i) {

                Log.e(TAG, "onNext:"+i);

            }

        };

        observable.subscribe(subscriber);


输出信息为


onNext1

onNext2

onNext3

onNext4

onCompleted


from操作符可以转换Future、Iterable和数组。对于Iterable和数组,产生的Observable会发射Iterable或数组的每一项数据。对于Future,它会发射Future.get()方法返回的单个数据,并且还可以增加通过: from(Future,timeout, timeUnit)指定超时时间,如果执行的时候Future超时会回调onError()方法。


just


just将单个数据转换为发射那个数据的Observable,Just类似于From,但是From会将数组或Iterable的数据取出然后逐个发射,而Just只是简单的原样发射,将数组或Iterable当做单个数据,如果你传递null给Just,它会返回一个发射null值的Observable。不要误认为它会返回一个空Observable(完全不发射任何数据的Observable)。对于just可以接收1到10个数据,返回一个按参数列表顺序发射这些数据的Observable。


Observable.just(1 2, 3, 4)

                .subscribe(new SubscriberInteger>() {

                    @Override

                    public void onCompleted() {

                        Log.e(TAG, "onCompleted: ");

                    }

 

                    @Override

                    public void onError(Throwable e) {

                        Log.e(TAG, "onError: ");

                    }

 

                    @Override

                    public void onNext(Integer integer) {

                        Log.e(TAG, "onNext: " + integer);

                    }

                });


输出


onNext:1

onNext:2

onNext:3

onNext:4

onCompleted:


对于just参数类型可以是多种,如下,传入两个类型数据


Observable.just(0, "one", 6, "two", 8, "three")

                .subscribe(new Subscriber() {

                    @Override

                    public void onCompleted() {

                        Log.e(TAG, "onCompleted: " );

                    }

 

                    @Override

                    public void onError(Throwable e) {

                        Log.e(TAG, "onError: ");

                    }

 

                    @Override

                    public void onNext(Serializable serializable) {

                        Log.e(TAG, "onNext: "+serializable.toString());

                    }

                });


则输出信息


onNext:0

onNext:one

onNext:6

onNext:two

onNext:8

onNext:three

onCompleted:


Empty/Never/Error


Empty:创建一个不发射任何数据但是正常终止的Observable,此时会回调onCompleted()


Never:创建一个不发射数据也不终止的Observable


Error:创建一个不发射数据以一个错误终止的Observable


error操作符需要一个Throwable参数,你的Observable会以此终止。这些操作符默认不在任何特定的调度器上执行,但是empty和error有一个可选参数是Scheduler,如果你传递了Scheduler参数,它们会在你指定的调度器上发送通知。


接下文


来源:伯乐在线专栏作者 - Code4Android

以上是关于RxJava操作符系列一(上)的主要内容,如果未能解决你的问题,请参考以下文章

RxJava系列之二 变换类操作符具体解释1

RxJava系列6(从微观角度解读RxJava源码)

RxJava系列4(过滤操作符)

快速上手 Kotlin 开发系列之集合操作符

Android RxJava 实战系列:联合判断

RxJava系列3(转换操作符)