RXJAVA的使用

Posted 技术丶从积累开始

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RXJAVA的使用相关的知识,希望对你有一定的参考价值。

参照文档:http://gank.io/post/560e15be2dca930e00da1083

一、什么是观察者模式

就是观察者告诉被观察者我需要你的某种状态,当被观察者做出动作的时候,观察者立刻做出相应反应。所以步骤就是,创建被观察者,逻辑是它要执行的动作。创建观察者,当被观察者做出动作的时候,观察者该怎么做。之后观察者要盯着观察者,这就是连接。

二、RXJAVA中的观察者和被观察者

Observable(被观察者)     Observer/Subscribe(观察者)

三、创建 被观察者  和 观察者 并建立连接

    @Override
    protected void onCreate(Bundle savedInstanceState) {
        super.onCreate(savedInstanceState);
        setContentView(R.layout.activity_main);
        //创建被观察者方式一,运用OnSubscribe创造自己的逻辑在其中
        Observable observable = Observable.create(new Observable.OnSubscribe<String>(){
            @Override
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("123");
                subscriber.onNext("321");
                subscriber.onCompleted();
            }
        });
        //方式二:
        Observable observable1 = Observable.just("123","321");
        //方式三:
        String [] str = {"123","321"};
        Observable observable2 = Observable.from(str);
        /*
        * 最终的结果都是:
        * subscriber.onNext("123");
          subscriber.onNext("321");
          subscriber.onCompleted();
          这个顺序调用的,这里的subscriber就是注册的观察者
        * */
        

        //观察者,当被观察者做出动作了,观察者要做的事情逻辑
        Observer<String> observer = new Observer<String>() {
            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(String s) {
                //当返回数据的时候
            }
        };
        //观察者和被观察者建立连接
        observable.subscribe(observer);
    }
简单使用
Observable.subscribe(Subscriber) 的内部实现是这样的(仅核心代码):

// 注意:这不是 subscribe() 的源码,而是将源码中与性能、兼容性、扩展性有关的代码剔除后的核心代码。
// 如果需要看源码,可以去 RxJava 的 GitHub 仓库下载。
public Subscription subscribe(Subscriber subscriber) {
    subscriber.onStart();
    onSubscribe.call(subscriber);
    return subscriber;
}

可以看到subscriber() 做了3件事,但为什么还要返回subscriber这个对象?

这是为了方便 unsubscribe(); 那么unsubscribe()的优点在哪里?

用来及时回收,防止内存溢出。  在什么时候使用比较好?

在onPause()和onStop()的时候比较好。

四、定义不完整回调

什么叫不完整回调,就是创建观察者的话,需要重写三个方法,不完整回调就是,只要重写其中的一个方法就可以了。

用到的类型:Action类   延生类:Action1 Action2 .... ActionX

他们的区别在哪里:传入的参数个数,Action0表示,不传入参数,Action1表示传入一个参数。

Action类中只有一个call()方法,根据放入Observable类摆放顺序,决定其充当Observer类中的哪种方法。(onNext(),onError(),onComplete())

举例:

  //没有传入参数
        Action0 action0 = new Action0() {
            @Override
            public void call() {

            }
        };
        //只有一个输入参数
        Action1<String> action1 = new Action1<String>() {

            @Override
            public void call(String s) {
                //没有东西
            }
        };
        //现在有两个输入参数了
        Action2<String,String> action2 = new Action2<String, String>() {
            @Override
            public void call(String s, String s2) {

            }
        };
        Observable observable = Observable.just("asd","asd");
        //注意事项:根据Observer的结构,发现onNext()必须得有一个参数,所以必须使用Action1
        //同理:onError也必须使用Action1       onComplete()只能用Action0
        observable.subscribe(action1);
        observable.subscribe(action1,action1);
        observable.subscribe(action1,action1,action0);
        //这样之后就按照顺序传参,可以是一个或者多个。不完整回调的时候,Observable会自动创建不完整的部分。
不完整回调

小练习:将字符串全部打印出来

 //采用第三种创建方式
        String [] str = {"你好","Hello","I am you friends"};
        Observable<String> observable = Observable.from(str);
        //观察者
        Observer<String> observer = new Subscriber<String>() {
            @Override
            public void onCompleted() {
                Log.d("MainActivity","再见");
            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onNext(String s) {
                Log.d("MainActivity",s);
            }
        };
        //注册
        observable.subscribe(observer);
练习

五、线程控制(Scheduler)

观察者模式本身的目的就是『后台处理,前台回调』的异步机制,因此异步对于 RxJava 是至关重要的。

在不指定线程的情况下, RxJava 遵循的是线程不变的原则,即:在哪个线程调用 subscribe(),就在哪个线程生产事件;在哪个线程生产事件,就在哪个线程消费事件。如果需要切换线程,就需要用到 Scheduler (调度器)。

api介绍:

  • Schedulers.immediate(): 直接在当前线程运行,相当于不指定线程。这是默认的 Scheduler
  • Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。
  • Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。
  • Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
  • 另外, android 还有一个专用的 AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行。

有了这几个 Scheduler ,就可以使用 subscribeOn() 和 observeOn() 两个方法来对线程进行控制了。 * subscribeOn(): 指定subscribe() 所发生的线程,即 Observable.OnSubscribe 被激活时所处的线程。或者叫做事件产生的线程。 * observeOn(): 指定Subscriber 所运行在的线程。或者叫做事件消费的线程。

//所以只要在刚才练习的过程中加上
 //指定在线程池中发生
observable.subscribeOn(Schedulers.io());
//指定在主线程中消费
observable.observeOn(AndroidSchedulers.mainThread());

//这样Observable就会在线程池中运行,Observer在主线程中执行
举例

六、变换

变换,就是将事件序列中的对象或整个序列进行加工处理,转换成不同的事件或事件序列

①、Observable.map()  (解决一对一事件的转换)

Observable.just("images/logo.png") // 输入类型 String
    .map(new Func1<String, Bitmap>() {
        @Override
        public Bitmap call(String filePath) { // 参数类型 String
            return getBitmapFromPath(filePath); // 返回类型 Bitmap
        }
    })
    .subscribe(new Action1<Bitmap>() {
        @Override
        public void call(Bitmap bitmap) { // 参数类型 Bitmap
            showBitmap(bitmap);
        }
    });

发现通过map()方法,将一个String类型的变量,转换成了Bitmap类型。之后再通过观察者执行。

知识:Func类:与Action类是一样的,但是Func类有返回值,但是Action类没有而已。

问题:map()只能接收一个对象(一对一的转化),当我有多个事件序列的时候,我该怎么办(就是有多个String 的时候),还有当我有一个student对象,该对象对应着多个课程(一对多的转换),我怎么将这么多的课程传给subscribe();

②、 flatMap()   (解决一对多事件的转换)

Student[] students = ...;
Subscriber<Course> subscriber = new Subscriber<Course>() {
    @Override
    public void onNext(Course course) {
        Log.d(tag, course.getName());
    }
    ...
};
Observable.from(students)
    .flatMap(new Func1<Student, Observable<Course>>() {
        @Override
        public Observable<Course> call(Student student) {
            return Observable.from(student.getCourses());//①发现这里返回的是一个Observable
        }
    })
    .subscribe(subscriber);

     ①、原理:flatMap() 中返回的是个 Observable 对象,并且这个 Observable 对象并不是被直接发送到了 Subscriber 的回调方法。而是将它激活,于是它开始发送事件,每一个被Observable创建出来的 发送的事件,都被汇入同一个 Observable ,而这个 Observable 负责将这些事件统一交给 Subscriber 的回调方法。

理论解析:把事件拆成了两级,通过一组新创建的 Observable 将初始的对象『铺平』之后通过统一路径分发了下去。

③、lift()   (代理模式转换)   (最好画图理解)

// 注意:这不是 lift() 的源码,而是将源码中与性能、兼容性、扩展性有关的代码剔除后的核心代码。
// 如果需要看源码,可以去 RxJava 的 GitHub 仓库下载。
public <R> Observable<R> lift(Operator<? extends R, ? super T> operator) {
    return Observable.create(new OnSubscribe<R>() {
        @Override
        public void call(Subscriber subscriber) {
            Subscriber newSubscriber = operator.call(subscriber);
            newSubscriber.onStart();
            onSubscribe.call(newSubscriber);
        }
    });
}

发现,该代码返回了一个Obervable对象。原理:①、当调用lift()方法的时候,创建一个Observable,之后再调用subcribe()的时候,建立联系的Observer是放在新的Obserable上的,然后根据

observable.lift(new Observable.Operator<String, Integer>() {
    @Override
    public Subscriber<? super Integer> call(final Subscriber<? super String> subscriber) {
        // 将事件序列中的 Integer 对象转换为 String 对象
        return new Subscriber<Integer>() {
            @Override
            public void onNext(Integer integer) {
                subscriber.onNext("" + integer);
            }

            @Override
            public void onCompleted() {
                subscriber.onCompleted();
            }

            @Override
            public void onError(Throwable e) {
                subscriber.onError(e);
            }
        };
    }
});

发现在operator实现逻辑之后,才调用Observer,之后再创建一个Observer(就变成旧Obserable的Observer了),最后调用旧Obserable的call()方法。

这是一种代理模式,比如说onNext()方法,Obserable直接调用onNext(),中间出现一个代理,也是onNext()方法,其中实现了自己的逻辑,最后再调用之前的onNext()方法。

就相当于,同样的方法名,被加上了一层逻辑,但是之前的调用方式还是一样的。这个代理就是operate类。

注:代理机制常用于通过事件拦截和处理实现事件序列的变换。

 

个人感觉RXJAVA的强大之处:①、能够在一个界面实现异步加载。②、利用变形实现逻辑上嵌套的解耦

以上是关于RXJAVA的使用的主要内容,如果未能解决你的问题,请参考以下文章

使用 RxJava 将数据从 ViewModel 移动到 Fragment

如何在基础活动暂停时暂停rxjava Observable.interval

导航到另一个片段时触发 API 调用

如何在Activity中使用Retrofit和RxJava / RxAndroid处理旋转?

从 recyclerview 的适配器访问父片段方法

RxJava defer操作符实现代码支持链式调用