RxJava入门
Posted 赛艇队长
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RxJava入门相关的知识,希望对你有一定的参考价值。
项目小版本上线,抽空简单学习了下久仰大名的RxJava
一、引入
个人觉得rxjava的特点:
- 强大灵活的事件流处理(多线程/多事件/复合对象)
- 强大灵活优雅简洁的异步
- 链式调用
- 可自动Lambda化
实现:RxJava 是通过一种扩展的观察者模式来实现的
类比 | 类比 | 实际 | 实际 | 职责 |
演讲者 | Button |
(可)被订阅者
(同右)
|
(可)被观察者
Observable
|
决定什么时候触发事件以及触发怎样的事件 |
听众 | OnClickListener |
订阅者
Subscriber
|
观察者
Observer
|
决定事件触发的时候将有怎样的行为 |
买票 | setOnClickListener() |
订阅
subscribe
|
注册 | |
命令 | onClick() | 事件 | 事件 |
演讲者只有Observable一种类,但是听众有Subscriber和Observer两种类,有点跟演讲者一个听众很多类似
与传统观察者模式不同, RxJava 的事件回调方法除了普通事件 onNext() (相当于 onClick() / onEvent())之外,还定义了两个特殊的事件:onCompleted() 和 onError()。
- 很多onNext()
- onCompleted(): 事件队列完结。RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。RxJava 规定,当不会再有新的onNext() 发出时,需要触发 onCompleted() 方法作为标志。
- onError(): 事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。
在一个正确运行的事件序列中, onCompleted() 和 onError() 有且只有一个,并且是事件序列中的最后一个。需要注意的是,onCompleted() 和 onError() 二者也是互斥的,即在队列中调用了其中一个,就不应该再调用另一个。
二、实现方式
步骤:
- 创建演讲者
- 创建听众
- 买票
1、创建演讲者
创建演讲者的方式有点特殊,它不是通过new的方法,而是通过一系列不同的内部静态工厂方法来创建的,最普通的有create( ) 方法
1.1.create( ) 方法创建
- 不是通过new的方法,而是一个内部静态工厂方法create( )来创建,这个方法需要传入一个 OnSubscribe 对象作为参数
- 不过换个角度,把它当成一个需要传入一个参数的构造方法就好了(虽然内部是有点其他货的)
- 方法具体是:Observable<T> create ( OnSubscribe<T> f )
- 这个 OnSubscribe 类本身是Observable的内部类
- 这个对象在create( )时传入后会存储在返回的 Observable 对象中
- 当 Observable 被订阅的时候,OnSubscribe 的 call() 方法会自动被调用
- 这个call方法会遍历传入所有的听众o,这些听众都实现了听众的那三个约定方法,在这里就可以执行自己需要的业务代码,并在需要的时候回调听众的那三个约定方法
- 换个角度说,OnSubscribe 的作用相当于一个计划表或者说事件发生器
- 这个泛型T是输出类型,也就是会传给听众的类型
简写:
Observable observable = Observable.create(new OnSubscribe() {
@Override
public void call(Object o) { } }); |
具体的实际的写法:
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("Hello"); subscriber.onNext("Hi"); subscriber.onCompleted(); } }); |
1.2.两个快捷创建静态工厂方法 just( ) , from( )
1.2.1.Observable<T> just( T t1, T t2 )
- 这个方法有10个重载,分别可以传入1个到10个参数(orz)
- 这是一个简便方法,会将传入的参数依次发送出来
- 可以写成基本的 create( ) 形式
Observable observable = Observable.just("Hello", "Hi");
|
等于上面的 create( ):
Observable observable = Observable.create(new Observable.OnSubscribe<String>() {
@Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("Hello"); subscriber.onNext("Hi"); subscriber.onCompleted(); } });
|
1.2.2.
Observable<T> from( T[ ] array )
Observable<T> from( Iterable<? extends T> array )
- 将传入的数组或 Iterable 拆分成具体对象后,依次发送出来
- 可以写成基本的 create( ) 形式
String[] words = {"Hello", "Hi"};
Observable observable = Observable.from(words);
|
2、创建听众
这个有几种方式
2.1.通过rx包中的 Observer 接口
- 注意不是java中util包中的接口
Observer<String> observer = new Observer<String>() {
@Override public void onNext(String s) {} @Override public void onCompleted() {} @Override public void onError(Throwable e) {} }; |
2.2.通过rx包中的 Subscriber 抽象类[推荐]
- 它实现了 Observer 接口,并进行了一些扩展
- 使用方法跟Observer一样,而且必须实现的方法就是 Observer 接口中的方法,在订阅(subscribe)的时候,Observer 也总是会先被转换成一个 Subscriber 再使用,所以基本建议可以用Observer的地方都用Subscriber吧
- 这个抽象类扩展了两个可选的方法:
- onStart( )
- 它会在 subscribe 刚开始,而事件还未发送之前被调用,可以用于做一些准备工作,例如数据的清零或重置
- 需要注意的是,如果对准备工作的线程有要求(例如弹出一个显示进度的对话框,这必须在主线程执行), onStart() 就不适用了,因为它总是在 subscribe 所发生的线程被调用,而不能指定线程。要在指定的线程来做准备工作,可以使用 doOnSubscribe() 方法
- unsubscribe( )
- 这个方法用于取消订阅。在这个方法被调用后,Subscriber 将不再接收事件
- 一般在这个方法调用前,可以使用 isUnsubscribed() 先判断一下状态
- unsubscribe() 这个方法很重要,因为在 subscribe() 之后, Observable 会持有 Subscriber 的引用,这个引用如果不能及时被释放,将有内存泄露的风险。所以最好保持一个原则:要在不再使用的时候尽快在合适的地方(例如 onPause() onStop() 等方法中)调用unsubscribe() 来解除引用关系,以避免内存泄露的发生
- onStart( )
- 这个泛型T是输入类型,也就是听众能接收的类型
Subscriber<String> subscriber = new Subscriber<String>() {
@Override public void onNext(String s) {} @Override public void onCompleted() {} @Override public void onError(Throwable e) {} }; |
2.3.快捷创建方法:ActionX 接口[推荐]
听众也有快捷的创建方式,那就是通过ActionX 接口
- ActionX 接口有很多,Action0 和 Action1 最常用,还有Action2, Action3,X代表传入的参数的数目
- 他们其实是对具有不同传入参数、但是无返回值的方法的包装接口
- subscribe()会根据这些ActionX对象生成正常的Subscriber
以0和1为例
- Action0
- Action0 是 RxJava 的一个接口,它只有一个方法 call(),这个方法是无参无返回值的
- 由于 onCompleted() 方法也是无参无返回值的,因此 Action0 可以被当成一个包装对象,将 onCompleted() 的内容打包起来将自己作为一个参数传入 subscribe() 以实现不完整定义的回调
- 这样其实也可以看做将 onCompleted() 方法作为参数传进了subscribe(),相当于其他某些语言中的『闭包』
- Action1
- Action1 也是一个接口,它同样只有一个方法 call(T param),这个方法也无返回值,但有一个参数;
- 与 Action0 同理,由于 onNext(T obj) 和 onError(Throwable error) 也是单参数无返回值的,因此 Action1可以将 onNext(obj) 和 onError(error) 打包起来传入 subscribe() 以实现不完整定义的回调
正常使用方式是:
- new 出 action 对象
- 组合塞进 subsribe 方法
通过Action0、Action1构造三种方法的包装对象
// onNext()
Action1<String> action1 = new Action1<String>() { @Override public void call(String s) {} }; // onError() Action1<Throwable> action2 = new Action1<Throwable>() { @Override public void call(Throwable throwable) {} }; // onCompleted() Action0 action3 = new Action0() { @Override public void call() {} }; |
// 自动创建 Subscriber ,并使用 action1 来定义 onNext()
observable.subscribe(action1); // 自动创建 Subscriber ,并使用 action1 和 action2 来定义 onNext() 和 onError() observable.subscribe(action1, action2); // 自动创建 Subscriber ,并使用 action1、 action2 和 action3 来定义 onNext()、 onError() 和 onCompleted() observable.subscribe(action1, action2, action3); |
如果只是要onNext( ) ,这样使用匿名类也很清晰
Observable.just("1","2","3")
.subscribe(new Action1<String>() { @Override public void call(String name) { Log.d(tag, name); } }); |
3、买票(进行订阅)
创建了 Observable 和 Observer 之后,再用 subscribe() 方法将它们联结起来,整条链子就可以工作了
代码形式很简单:
observable.subscribe(subscriber);
|
- 有人可能会注意到, subscribe() 这个方法有点怪:它看起来是『observalbe 订阅了 subscriber』而不是『subscriber 订阅了 observalbe』,这看起来就像『杂志订阅了读者』一样颠倒了对象关系。这让人读起来有点别扭
- 因为如果把 API 设计成 subscriber.subscribe(observable) ,虽然更加符合思维逻辑,但对流式 API 的设计就造成影响了,因为对于流事件,是有一个生产者和一系列消费者的,所以生产者放前边,后边跟一串消费者才是更流的形式
- 这种形式其实跟普通的java观察者模式很像,演讲者.add(听众)
三、快速使用及举例
使用rxjava首先是引入依赖:
1.基本的rxjava:
compile ‘io.reactivex:rxjava:1.0.14‘
2.带android特性的rxjava:
compile ‘io.reactivex:rxandroid:1.0.1‘
3.支持rxjava的网络加载库retrofit
compile ‘com.squareup.retrofit:retrofit:1.9.0‘
|
然后几个简单的例子:
1.真正最简单的例子:打印几个字符串
- 三步走
- 最简单的just方法
- 一个普通的subscriber
Observable.just("1","2","3").subscribe(new Subscriber<String>() {
@Override public void onNext(String s) { Log.d(tag, s); } @Override public void onCompleted() {} @Override public void onError(Throwable e) {} }); |
2.复杂一点点:给ImageView set 图片
- 需求:
- 实现:
- 还是基本的三步走,稍微注意下格式(流式结构)
final Drawable drawable = getActivity().getResources().getDrawable(R.drawable.1);
final ImageView imageView = new ImageView(getActivity()); Observable.create(new Observable.OnSubscribe<Drawable>() { @Override public void call(Subscriber<? super Drawable> subscriber) { subscriber.onNext(drawable); subscriber.onCompleted(); } }).subscribe(new Subscriber<Drawable>() { @Override public void onNext(Drawable drawable) { imageView.setImageDrawable(drawable); } @Override public void onCompleted() {} @Override public void onError(Throwable e) { Log.d(tag, "error!"); } }); |
四、线程控制:Scheduler 和 subscribeOn()、observeOn()
- 在不指定线程的情况下, RxJava 遵循的是线程不变的原则
- 即:在哪个线程调用 subscribe(),就在哪个线程生产事件;在哪个线程生产事件,就在哪个线程消费事件。
- 如果需要切换线程,就需要用到 Scheduler (调度器)
1.Scheduler
RxJava 通过Scheduler来指定每一段代码应该运行在什么样的线程
RxJava 已经内置了几个 Scheduler ,它们已经适合大多数的使用场景:
- Schedulers.immediate()
- 直接在当前线程运行,相当于不指定线程
- 这是默认的 Scheduler
- Schedulers.newThread()
- 总是启用新线程,并在新线程执行操作
- Schedulers.io( )
- I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler
- 行为模式和 newThread() 差不多,区别在于 io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率
- 不要把计算工作放在 io() 中,可以避免创建不必要的线程
- Schedulers.computation( )
- 计算所使用的 Scheduler,这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算
- 这个 Scheduler 使用的固定的线程池,大小为 CPU 核数
- 不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU
- Android 还有一个专用的 AndroidSchedulers.mainThread()
- 它指定的操作将在 Android 主线程运行
2.subscribeOn() 和 observeOn()
有了这几个 Scheduler ,就可以使用 subscribeOn() 和 observeOn() 两个方法来对线程进行控制了
- subscribeOn( )
- 指定 subscribe() 所发生的线程,即 Observable.OnSubscribe 被激活时所处的线程
- 而这其实就是事件产生的线程,也就是Observable活动的线程
- (看到这不免有点乱,Observable活动在subscribeOn指定的线程,那这里就只能用[Observable.OnSubscribe 被激活]这件事来记了)
- observeOn( )
- 指定 Subscriber 所运行在的线程。或者叫做事件消费的线程
- (同样正好相反)
3.例子
Observable.just(1, 2, 3, 4)
.subscribeOn(Schedulers.io()) // 指定 subscribe() 发生在 IO 线程 .observeOn(AndroidSchedulers.mainThread()) // 指定 Subscriber 的回调发生在主线程 .subscribe(new Action1<Integer>() { @Override public void call(Integer number) { Log.d(tag, "number:" + number); } });
|
|
4.拓展
4.1.事件消费的线程可多次变换
- observeOn() 指定的是 Subscriber 的线程,而这个 Subscriber 不一定是最终subscribe() 时的Subscriber ,而是 observeOn() 执行时的当前 Observable 所对应的 Subscriber ,即它的直接下级 Subscriber
- 换句话说,observeOn() 指定的是它之后的操作所在的线程
- 因此如果有多次切换线程的需求,只要在每个想要切换线程的位置调用一次 observeOn() 即可
- 如下,通过 observeOn() 的多次调用,程序实现了线程的多次切换
Observable.just(1, 2, 3, 4) // IO 线程,由 subscribeOn() 指定
.subscribeOn(Schedulers.io()) .observeOn(Schedulers.newThread()) .map(mapOperator1) // 新线程,由 observeOn() 指定 .observeOn(Schedulers.io()) .map(mapOperator2) // IO 线程,由 observeOn() 指定 .observeOn(AndroidSchedulers.mainThread()) .subscribe(subscriber); // Android 主线程,由 observeOn() 指定 |
4.2.事件产生线程是固定的
不同于 observeOn() , subscribeOn() 的位置虽然放在哪里都可以,但它是只能调用一次
当使用了多个subscribeOn() 的时候,只有第一个 subscribeOn() 起作用
4.3.流程开始前的初始化问题:【Subscriber 的 onStart( )】【Observable 的 doOnSubscribe( )】
- 在前面讲 Subscriber 的时候,提到过 Subscriber 的 onStart() 可以用作流程开始前的初始化
- 然而 onStart() 由于在subscribe() 发生时就被调用了,也就是没有被observeOn() 指定线程,因此是执行在 subscribe() 被调用时的线程也就是原线程
- 这就导致如果 onStart() 中含有对线程有要求的代码(例如在界面上显示一个 ProgressBar,这必须在主线程执行),将会有线程非法的风险,因为有时你无法预测subscribe() 将会在什么线程执行
解决方法
- Observable(不是Subscriber的)有一个方法doOnSubscribe()
- 它和 Subscriber.onStart() 同样是在subscribe() 调用后而且在事件发送前执行,但区别在于它可以指定线程
- 默认情况下, doOnSubscribe() 执行在 subscribe() 发生的线程
- 而如果在 doOnSubscribe() 之后有 subscribeOn() 的话,它将执行在离它最近的 subscribeOn() 所指定的线程
Observable.create(onSubscribe)
.subscribeOn(Schedulers.io()) .doOnSubscribe(new Action0() { @Override public void call() { progressBar.setVisibility(View.VISIBLE); // 需要在主线程执行 } }) .subscribeOn(AndroidSchedulers.mainThread()) // 指定主线程 .observeOn(AndroidSchedulers.mainThread()) .subscribe(subscriber); |
五、变换
- RxJava 提供了对事件序列进行变换的支持,这是它的核心功能之一,也是大多数人说『RxJava 真是太好用了』的最大原因
- 所谓变换,就是将事件序列中的对象或整个事件队列进行加工处理,转换成不同的事件或事件序列
变换节点是由一个【变换算法】和一个【变换枢纽】构成的
- 变换算法是一系列方法,比如:map()、flatMap(),他们决定变换的方式,是随方法固定的
- 变换枢纽是一种类似管道枢纽的容器,里边由你写具体的变换细节;输入和输出就像管道,如何分流就像管道的布线
- 往一个变换算法里传入一个变换枢纽就组成了一个变换节点
1.变换枢纽
- 变换枢纽是FuncX系列接口的对象
- FuncX系列接口跟前面讲的ActionX系列接口非常像
- 他们是位于 rx.functions 包下的全部两个系列接口
- ActionX系列是可以传入1个或多个参数,但是无返回值的call方法的包装
- FuncX系列是可以传入1个或多个参数,但是有一个返回值的call方法的包装
- 正是两个方法的唯一区别(是否有返回值)决定了在rx链中两者不同的角色
- ActionX系列由于没有返回值,所以只能作为链的终点,也就是为观众服务,可以被组合构成观众
- FuncX系列由于有返回值,但他又不能作为链的起点,所以就自然成了我们这里要说的新角色:链的中继者,或者说变换器
- FuncX系列的泛型位置是这样:前面的所有参数是输入参数,最后一个参数是输出参数
2.变换算法
2.1.map( )
- map() 方法将参数中的 String 对象转换成一个 Bitmap 对象后返回
- 在经过 map() 方法后,事件的参数类型也由 String转为了 Bitmap
Observable.just("images/logo.png") // 输入类型 String
.map(new Func1<String, Bitmap>() { @Override public Bitmap call(String filePath) { // 参数类型 String return getBitmapFromPath(filePath); // 返回类型 Bitmap } }) .subscribe(new Action1<Bitmap>() { @Override public void call(Bitmap bitmap) { // 参数类型 Bitmap showBitmap(bitmap); } }); |
2.2.flatMap( )
这是一个很有用但不太好难理解的变换
2.2.1.情景推导
首先假设这么一种需求:有一个数据结构『学生』,现在需要打印出一组学生的名字
用上面提到的map方法实现起来很简单:
Student[] students = new Student[10];
Observable.from(students) .map(new Func1<Student, String>() { @Override public String call(Student student) { return student.getName(); } }) .subscribe(new Subscriber<String>() { @Override public void onNext(String name) { Log.d(tag, name); } ... }); |
再假设:如果要打印出每个学生所需要修的所有课程的名称呢?(需求的区别在于,每个学生只有一个名字,但却有多个课程)
这个时候用不了map了,先用普通方式实现一下:
Student[] students = new Student[10];
Observable.from(students) .subscribe(new Subscriber<Student>() { @Override public void onNext(Student student) { List<Course> courses = student.getCourses(); for (int i = 0; i < courses.size(); i++) { Course course = courses.get(i); Log.d(tag, course.getName()); } } ... }); |
2.2.2.引入
- 上面实现倒是实现了,但是这个for循环的存在不免显得有点不太优雅
- 如果不想在 Subscriber 中使用 for 循环,而是希望在 Subscriber 中直接接收单个的 Course 对象呢(这对于代码复用很重要)?
- 用 map() 显然是不行的,因为 map() 是一对一的转化,而我现在的要求是一对多的转化。那怎么才能把一个 Student 转化成多个 Course 呢?
- 这个时候,就需要用 flatMap( ) 了
2.2.3.实现
Student[] students = new Student[10];
Observable.from(students) .flatMap(new Func1<Student, Observable<Course>>() { @Override public Observable<Course> call(Student student) { return Observable.from(student.getCourses()); } }) .subscribe(new Subscriber<Course>() { @Override public void onNext(Course course) { Log.d(tag, course.getName()); } ... });
|
- flatMap() 和 map() 有一个相同点:也是把传入的参数转化之后返回另一个对象
- 不同的是, flatMap() 中返回的是个 Observable 对象,并且这个 Observable 对象并不是被直接发送到了 Subscriber 的回调方法中,而是
- 先使用传入的事件对象创建一个 Observable 对象
- 但是并不发送这个 Observable, 而是将它激活(subscribe),于是它开始发送事件
- 每一个创建出来的子Observable 发送的事件,都被汇入同一个 Observable ,而这个 Observable 负责将这些事件统一交给 Subscriber 的回调方法
- 上面这三个步骤,把事件拆成了两级,通过一组新创建的 Observable 将初始的对象『铺平』之后通过统一路径分发了下去
- 这个『铺平』就是 flatMap() 所谓的 flat
- 每个list就像卷起的纸,flatMap就是把一卷卷纸展开再连在一起
2.3.throttleFirst( )
在每次事件触发后的一定时间间隔内丢弃新的事件。常用作去抖动过滤,例如按钮的点击监听器
妈妈再也不怕我的用户手抖点开两个重复的界面啦
RxView.clickEvents(button) // RxBinding 代码,后面的文章有解释
.throttleFirst(500, TimeUnit.MILLISECONDS) // 设置防抖间隔为 500ms .subscribe(subscriber); |
六、应用
1.Retrofit
2. RxBinding
- RxBinding 是 Jake Wharton 的一个开源库,它提供了一套在 Android 平台上的基于 RxJava 的 Binding API
- 所谓 Binding,就是类似设置 OnClickListener 、设置 TextWatcher 这样的注册绑定对象的 API
举个设置点击监听的例子。使用 RxBinding ,可以把事件监听用这样的方法来设置:
Button button = ...;
RxView.clickEvents(button) // 以 Observable 形式来反馈点击事件 .subscribe(new Action1<ViewClickEvent>() { @Override public void call(ViewClickEvent event) { } }); |
看起来除了形式变了没什么区别,实质上也是这样。甚至如果你看一下它的源码,你会发现它连实现都没什么惊喜:它的内部是直接用一个包裹着的 setOnClickListener() 来实现的。
然而,仅仅这一个形式的改变,却恰好就是 RxBinding 的目的:扩展性。
通过RxBinding 把点击监听转换成 Observable 之后,就有了对它进行扩展的可能。
扩展的方式有很多,根据需求而定。一个例子是前面提到过的 throttleFirst() ,用于去抖动,也就是消除手抖导致的快速连环点击:
RxView.clickEvents(button)
.throttleFirst(500, TimeUnit.MILLISECONDS) .subscribe(clickAction); |
3. 各种异步操作
前面举的 Retrofit 和 RxBinding 的例子,是两个可以提供现成的 Observable 的库。而如果你有某些异步操作无法用这些库来自动生成 Observable,也完全可以自己写。例如数据库的读写、大图片的载入、文件压缩/解压等各种需要放在后台工作的耗时操作,都可以用 RxJava 来实现,有了之前几章的例子,这里应该不用再举例了。
4. RxBus
RxBus 名字看起来像一个库,但它并不是一个库,而是一种模式,它的思想是使用 RxJava 来实现了 EventBus ,而让你不再需要使用Otto 或者 GreenRobot 的 EventBus。至于什么是 RxBus,可以看这篇文章。顺便说一句,Flipboard 已经用 RxBus 替换掉了 Otto,目前为止没有不良反应。
附录
1.关于观察者模式
其实觉得这个名字非常容易误解:观察者模式里两个角色,观察者和被观察者,两个方面会让人误解:
- 主被动关系,往往会有人觉得观察者是主动观察,其实不然
- 名字就是一字之差,容易混淆,造成理解障碍
第二点就不说了,来讨论下第一点,其实里边一个是事件发出者,一个是被事件驱动者,主被动关系是反过来的,应该是 主人和仆人,指挥官和小兵,演讲者和听众,button和onclicklistener 的关系,观察者其实是被动触发的,而不是主动观察
观察者模式用button和onclicklistener的模型来类比真是非常形象好理解(因为:1常用,平易近人;2非常符合)
对设置 OnClickListener 来说, View 是被观察者, OnClickListener 是观察者,二者通过 setOnClickListener() 方法达成订阅关系。订阅之后用户点击按钮的瞬间,Android Framework 就会将点击事件发送给已经注册的 OnClickListener 。采取这样被动的观察方式,既省去了反复检索状态的资源消耗,也能够得到最高的反馈速度。
- Button -> 被观察者
- OnClickListener -> 观察者
- setOnClickListener() -> 订阅
- onClick() -> 事件
参考资料:http://www.jcodecraeer.com/a/anzhuokaifa/androidkaifa/2015/1012/3572.html
以上是关于RxJava入门的主要内容,如果未能解决你的问题,请参考以下文章