RxJava——响应式和区域化的优秀框架(java&android)

Posted 夜辉疾风

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RxJava——响应式和区域化的优秀框架(java&android)相关的知识,希望对你有一定的参考价值。

高能预警:对准备了解RxJava 的程序员一个入口

一直听说Rxjava和RxBinding 在java和android开发中非常的牛逼。主要的优点是响应式编程,逻辑区域化。

今天,我也来说一说关于Rxjava的使用方法。后期会逐步讲到RxBinding在android中的使用和Rxjava同Retrofit结合的例子。

最先在网上看其他人的博客,总感觉讲的不够详细也看的云里雾里。后来观摩的是Rxjava内测程序员“大头鬼”翻译的Rxjava的使用方法博客。看后感觉心中对Rxjava毛瑟顿开。

再后来了解了“扔物线”写的文章,详细的介绍了Rxjava的用法。看完后就对这个框架无法自拔。

这里先贴上他们的博客。不过我还是建议先看完我的博客,再去看“大头鬼”翻译的博客,最后加深了解看“扔物线”的文章。

‘大头鬼Bruce’ RxJava
《给 Android 开发者的 RxJava 详解》 作者:扔物线

切入正题:

基本概念:RxJava 以观察者(Observer)和订阅者(Subscriber)为基础的异步响应方式

1,创建一个Observable对象

有几种方法:

//创建一个Observable对象很简单,直接调用Observable.create即可
        Observable<String> mObservable1 = Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("aaa");
                subscriber.onCompleted();
                subscriber.onError(new Throwable("执行异常信息"));
            }
        });
//创建Observable对象可以简化成一行代码
        Observable<String> mObservable1 = Observable.just("Hello, just world!");

2,订阅Observable

注意:订阅Observable有两种方法:
1,Subscribe
2,Observer
如果你只想使用基本功能,选择 Observer 和 Subscriber 是完全一样的
(在 RxJava 的 subscribe 过程中,Observer 也总是会先被转换成一个 Subscriber 再使用)

它们的区别在于:
onStart(): 这是 Subscriber 增加的方法。它会在 subscribe 刚开始,而事件还未发送之前被调用,可以用于做一些准备工作,例如数据的清零或重置。这是一个可选方法,默认情况下它的实现为空。但是它总是在 subscribe 所发生的线程被调用,而不能指定线程。要在指定的线程来做准备工作,可以使用 doOnSubscribe() 方法。具体的可以看扔物线的文章。

unsubscribe(): 这是 Subscriber 所实现的另一个接口 Subscription 的方法,用于取消订阅。在这个方法被调用后,Subscriber 将不再接收事件。一般在这个方法调用前,可以使用 isUnsubscribed() 先判断一下状态。 unsubscribe() 这个方法很重要,因为在 subscribe() 之后, Observable 会持有 Subscriber 的引用,这个引用如果不能及时被释放,将有内存泄露的风险。所以最好在不用的时候解除引用关系,以避免内存泄露的发生。

所以,由此看来,我们还是使用Subscriber比较好(因为Observer还是会被转成Subscriber并且多了几个方法嘛~)

订阅的方式也有几种

//OnSubscriber订阅了mObservable
        mObservable.subscribe(new OnSubscriber());
//处理Observable对象发出的字符串
    private class OnSubscriber extends Subscriber<String> {

        @Override
        public void onCompleted() {
            //完成
        }

        @Override
        public void onError(Throwable e) {
            //错误
            Log.i("test", e.getMessage());
        }

        @Override
        public void onNext(String s) {
            //往下执行的
            Log.i("test", s);
        }
    }

另外一种:

//OnSubscriber订阅了mObservable1,只接受onNext方法
        mObservable.subscribe(new OnAction1());
 //简化Subscriber,不关心OnComplete和OnError,可以使用Action1类。
    private class OnAction1 implements Action1<String> {

        @Override
        public void call(String o) {
            Log.i("test", o);
        }
    }
//简化Subscriber,不关心OnNext和OnComplete,可以使用Action1类。
    private class OnAction2 implements Action1<Throwable> {

        @Override
        public void call(Throwable o) {
            Log.i("test", o.getMessage());
        }
    }
//简化Subscriber,不关心OnNext和OnError,可以使用Action1类。
    private class OnAction3 implements Action0 {

        @Override
        public void call() {

        }
    }

3,检查订阅&取消订阅

//一行代码订阅
        Subscription subscription = Observable.just(rxJavaBean).subscribe(new OnAction1());

        subscription.isUnsubscribed();//检测是否被订阅
        subscription.unsubscribe();//退订,将从执行点终止

4,操作符(Operators)

这是Rxjava中最重要的部分,也是内容最多的部分。

开头,后面的代码会省略这个头

//操作符(Operators)
        Observable

1)create

 .create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                subscriber.onNext("aaa");
                subscriber.onCompleted();
                subscriber.onError(new Throwable("执行异常信息"));
            }
        });

顾名思义创建一个Observable对象。
参数传入一个实例化的OnSubscribe

2)just

.just("Hello, just world!");
//或者
.just(new String[]{"hellp","world","!"});

just的作用简化了create的过程。
源码中的解释是:返回一个可观测的,发出一个项目,然后完成。
第二种写法相当于多次执行了create方法中的subscriber.onNext(“aaa”);

3)subscribe
subscribe订阅一个被观察者(Observable)。
订阅后,被观察者返回的结果就会在subscribe中展示。

 //操作符(Operators)
        Observable
                .just("hello")
                .subscribe(new Subscriber<String>() {
                    @Override
                    public void onCompleted() {
                           //完成
                    }

                    @Override
                    public void onError(Throwable e) {
                            //错误
                    }

                    @Override
                    public void onNext(String s) {
                            //执行
                            //这里的s就是“hello”
                    }
                });

4)map

//操作符(Operators)
        Observable
                .just("hello")
                .map(new Func1<Object, String>() {//在执行操作之前做一些事情
                    @Override
                    public String call(Object o) {
                        return o.toString() + "!!!!";
                    }
                })
                .map(new OnMap1("aaaa"))
                .map(new OnMap1("aaaa"))
                .map(new OnMap1("aaaa"))
                .subscribe(new OnAction1());

map的作用相当于在create或者just方法后,传入原来的数据,并在原来的数据上做处理后再返回。
那么一旦被订阅(subscribe),打印出来的结果就不再是“hello”了,而是“hello!!!!”。

而且,map可以添加多个,改代码最后结果是“hello!!!!aaaaaaaaaaaa”。

5)from

我们先来定义一个方法query

private Observable<List<String>> query(String text) {
        final List<String> lists = new ArrayList<>();
        for (int i = 0; i < 10; i++) {
            lists.add(i + text);
        }
        return Observable.create(new Observable.OnSubscribe<List<String>>() {
            @Override
            public void call(Subscriber<? super List<String>> subscriber) {
                subscriber.onNext(lists);
                subscriber.onCompleted();
                subscriber.onError(new Throwable("无错误信息"));
            }
        });
    }

该方法用于模拟返回多个结果

from的作用就相当于把一个集合for循环了。

//更多操作符
        Observable<List<String>> observable = query("test");
        observable.subscribe(new Action1<List<String>>() {

            @Override
            public void call(List<String> strings) {
                //第一种展示方式
                for (String str : strings) {
                    Log.i("test", str);
                }
                //第二种展示方式
                //Observable.from()方法,它接收一个集合作为输入,然后每次输出一个元素给subscriber
                Observable.from(strings).subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Log.i("test", s + "_tow");
                    }
                });
            }
        });

6)flatMap
flatMap的作用是输出一个新的Observable,且正是我们在Subscriber想要接收的类型。

//Observable.flatMap()接收一个Observable的输出作为输入,同时输出另外一个Observable
        //flatMap输出的新的Observable正是我们在Subscriber想要接收的
        observable
                .flatMap(new Func1<List<String>, Observable<String>>() {
                    @Override
                    public Observable<String> call(List<String> urls) {
                        return Observable.from(urls);
                    }
                })
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Log.i("test", s + "_three");
                    }
                });

这样一来,和上段用from实现的逻辑是一样的但是代码却清晰了许多。

flatMap的作用就是接收observable输出的结果,然后返回一个observable对象。

7)filter
filter能够过滤集合中不符合条件的项目

observable
                .flatMap(new Func1<List<String>, Observable<String>>() {
                    @Override
                    public Observable<String> call(List<String> strings) {
                        return Observable.from(strings);
                    }
                })
                .filter(new Func1<String, Boolean>() {//filter能够过滤集合中不符合条件的项目
                    @Override
                    public Boolean call(String s) {
                        return !"0_three".equals(s);
                    }
                })
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Log.i("test", s);
                    }
                });

这样,我们得到的集合就剔除了“0_three”这个字段。返回结果中也不会显示。

8)take
只显示规定条数据

observable
                .flatMap(new Func1<List<String>, Observable<String>>() {
                    @Override
                    public Observable<String> call(List<String> strings) {
                        return Observable.from(strings);
                    }
                })
                .take(5)//只显示五条数据
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Log.i("test", s);
                    }
                });

这样,不管原来集合中有几条数据,都只返回5条

9)doOnNext
doOnNext()允许我们在每次输出一个元素之前做一些额外的事情

 observable
                .flatMap(new Func1<List<String>, Observable<String>>() {
                    @Override
                    public Observable<String> call(List<String> strings) {
                        return Observable.from(strings);
                    }
                })
                .take(5)//只显示五条数据
                .doOnNext(new Action1<String>() {//doOnNext()允许我们在每次输出一个元素之前做一些额外的事情
                    @Override
                    public void call(String s) {
                        //这里把获取到的结果缓存到sd卡
                    }
                })
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Log.i("test", s);
                    }
                });

这样,我们在得到结果之前已经将结果缓存到sd卡里面了

10)subscribeOn

使用subscribeOn()指定观察者代码运行的线程;它把以上的代码放在的非ui线程

 observable
                .flatMap(new Func1<List<String>, Observable<String>>() {
                    @Override
                    public Observable<String> call(List<String> strings) {
                        return Observable.from(strings);
                    }
                })
                .take(5)//只显示五条数据
                .doOnNext(new Action1<String>() {//doOnNext()允许我们在每次输出一个元素之前做一些额外的事情
                    @Override
                    public void call(String s) {
                        //这里把获取到的结果缓存到sd卡
                    }
                })
                .subscribeOn(Schedulers.newThread())//使用subscribeOn()指定观察者代码运行的线程;它把以上的代码放在的非ui线程
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Log.i("test", s);
                    }
                });

碰巧的是它的名字里包含subscribe,但它管理的却是observable的线程。

11)observeOn
使用observerOn()指定订阅者运行的线程

 observable
                .flatMap(new Func1<List<String>, Observable<String>>() {
                    @Override
                    public Observable<String> call(List<String> strings) {
                        return Observable.from(strings);
                    }
                })
                .take(5)//只显示五条数据
                .doOnNext(new Action1<String>() {//doOnNext()允许我们在每次输出一个元素之前做一些额外的事情
                    @Override
                    public void call(String s) {
                        //这里把获取到的结果缓存到sd卡
                    }
                })
                .subscribeOn(Schedulers.newThread())//使用subscribeOn()指定观察者代码运行的线程;它把以上的代码放在的非ui线程
                .observeOn(AndroidSchedulers.mainThread())//使用observerOn()指定订阅者运行的线程
                .subscribe(new Action1<String>() {
                    @Override
                    public void call(String s) {
                        Log.i("test", s);
                    }
                });

它的作用则是管理subscribe执行的线程了。

12)线程规划
线程规划:

  • Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。

  • Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。

  • Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。

  • Android 还有一个专用的 AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行。

13)将自定义对象封装成Observable对象
//将你的数据封装成Observable对象
newMethod(UserInfoBean);

//将你的数据封装成Observable对象
    private Observable<Object> newMethod(Object slowMethod) {
        //选择推迟加载
        return Observable.defer((Func0<Observable<Object>>) Observable.just(slowMethod));
    }

14)RxAndroid

很抱歉RxAndroid已经废了。。。。。。还在活跃开发中,接口变动太大
目前RxAndroid已经被RxBinding所替代。

RxBinding 提供了一套在 Android 平台上的基于 RxJava 的 Binding API。所谓 Binding,就是类似设置 OnClickListener 、设置 TextWatcher 这样的注册绑定对象的 API。

这个后期我也会写出来。

好了,Rxjava的常用方法已经介绍完了。包括创建对象,订阅对象,变换对象,线程控制。

另外,在android中Rxjava配合RxBinding 可以胜任android平台下的一些操作。这也是非常方便的。

而Rxjava也被Retrofit内置了。只要添加依赖库,它们两个就可以配合使用了。

    compile 'io.reactivex:rxjava:1.0.14'
    compile 'io.reactivex:rxandroid:1.0.1'
    compile 'com.squareup.retrofit:adapter-rxjava:2.0.0-beta2'
    compile 'com.squareup.retrofit:retrofit:2.0.0-beta2'
    compile 'com.squareup.retrofit:converter-gson:2.0.0-beta2'

这些依赖库分别是:Rxjava、RxAndroid、retrofit适配Rxjava、retrofit2.0、retrofit转换json数据。

当然还有RxBinding。

以上是关于RxJava——响应式和区域化的优秀框架(java&android)的主要内容,如果未能解决你的问题,请参考以下文章

Java并发,Akka和RxJava之间的区别?

响应式和自适应的区别

响应式编程入门(RxJava)

前端笔记:使用Proxy实现响应式和双向数据绑定

基于Vert.x和RxJava 2构建通用的爬虫框架

全民响应式编程,你还不懂RxJava吗