RXJAVA的使用
Posted 技术丶从积累开始
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RXJAVA的使用相关的知识,希望对你有一定的参考价值。
参照文档:http://gank.io/post/560e15be2dca930e00da1083
一、什么是观察者模式
就是观察者告诉被观察者我需要你的某种状态,当被观察者做出动作的时候,观察者立刻做出相应反应。所以步骤就是,创建被观察者,逻辑是它要执行的动作。创建观察者,当被观察者做出动作的时候,观察者该怎么做。之后观察者要盯着观察者,这就是连接。
二、RXJAVA中的观察者和被观察者
Observable(被观察者) Observer/Subscribe(观察者)
三、创建 被观察者 和 观察者 并建立连接
@Override protected void onCreate(Bundle savedInstanceState) { super.onCreate(savedInstanceState); setContentView(R.layout.activity_main); //创建被观察者方式一,运用OnSubscribe创造自己的逻辑在其中 Observable observable = Observable.create(new Observable.OnSubscribe<String>(){ @Override public void call(Subscriber<? super String> subscriber) { subscriber.onNext("123"); subscriber.onNext("321"); subscriber.onCompleted(); } }); //方式二: Observable observable1 = Observable.just("123","321"); //方式三: String [] str = {"123","321"}; Observable observable2 = Observable.from(str); /* * 最终的结果都是: * subscriber.onNext("123"); subscriber.onNext("321"); subscriber.onCompleted(); 这个顺序调用的,这里的subscriber就是注册的观察者 * */ //观察者,当被观察者做出动作了,观察者要做的事情逻辑 Observer<String> observer = new Observer<String>() { @Override public void onCompleted() { } @Override public void onError(Throwable e) { } @Override public void onNext(String s) { //当返回数据的时候 } }; //观察者和被观察者建立连接 observable.subscribe(observer); }
Observable.subscribe(Subscriber) 的内部实现是这样的(仅核心代码): // 注意:这不是 subscribe() 的源码,而是将源码中与性能、兼容性、扩展性有关的代码剔除后的核心代码。 // 如果需要看源码,可以去 RxJava 的 GitHub 仓库下载。 public Subscription subscribe(Subscriber subscriber) { subscriber.onStart(); onSubscribe.call(subscriber); return subscriber; }
可以看到subscriber()
做了3件事,但为什么还要返回subscriber这个对象?
这是为了方便 unsubscribe(); 那么unsubscribe()的优点在哪里?
用来及时回收,防止内存溢出。 在什么时候使用比较好?
在onPause()和onStop()的时候比较好。
四、定义不完整回调
什么叫不完整回调,就是创建观察者的话,需要重写三个方法,不完整回调就是,只要重写其中的一个方法就可以了。
用到的类型:Action类 延生类:Action1 Action2 .... ActionX
他们的区别在哪里:传入的参数个数,Action0表示,不传入参数,Action1表示传入一个参数。
Action类中只有一个call()方法,根据放入Observable类摆放顺序,决定其充当Observer类中的哪种方法。(onNext(),onError(),onComplete())
举例:
//没有传入参数 Action0 action0 = new Action0() { @Override public void call() { } }; //只有一个输入参数 Action1<String> action1 = new Action1<String>() { @Override public void call(String s) { //没有东西 } }; //现在有两个输入参数了 Action2<String,String> action2 = new Action2<String, String>() { @Override public void call(String s, String s2) { } }; Observable observable = Observable.just("asd","asd"); //注意事项:根据Observer的结构,发现onNext()必须得有一个参数,所以必须使用Action1 //同理:onError也必须使用Action1 onComplete()只能用Action0 observable.subscribe(action1); observable.subscribe(action1,action1); observable.subscribe(action1,action1,action0); //这样之后就按照顺序传参,可以是一个或者多个。不完整回调的时候,Observable会自动创建不完整的部分。
小练习:将字符串全部打印出来
//采用第三种创建方式 String [] str = {"你好","Hello","I am you friends"}; Observable<String> observable = Observable.from(str); //观察者 Observer<String> observer = new Subscriber<String>() { @Override public void onCompleted() { Log.d("MainActivity","再见"); } @Override public void onError(Throwable e) { } @Override public void onNext(String s) { Log.d("MainActivity",s); } }; //注册 observable.subscribe(observer);
五、线程控制(Scheduler)
观察者模式本身的目的就是『后台处理,前台回调』的异步机制,因此异步对于 RxJava 是至关重要的。
在不指定线程的情况下, RxJava 遵循的是线程不变的原则,即:在哪个线程调用 subscribe()
,就在哪个线程生产事件;在哪个线程生产事件,就在哪个线程消费事件。如果需要切换线程,就需要用到 Scheduler
(调度器)。
api介绍:
Schedulers.immediate()
: 直接在当前线程运行,相当于不指定线程。这是默认的Scheduler
。Schedulers.newThread()
: 总是启用新线程,并在新线程执行操作。Schedulers.io()
: I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的Scheduler
。行为模式和newThread()
差不多,区别在于io()
的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下io()
比newThread()
更有效率。不要把计算工作放在io()
中,可以避免创建不必要的线程。Schedulers.computation()
: 计算所使用的Scheduler
。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个Scheduler
使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在computation()
中,否则 I/O 操作的等待时间会浪费 CPU。- 另外, android 还有一个专用的
AndroidSchedulers.mainThread()
,它指定的操作将在 Android 主线程运行。
有了这几个 Scheduler
,就可以使用 subscribeOn()
和 observeOn()
两个方法来对线程进行控制了。 * subscribeOn()
: 指定subscribe()
所发生的线程,即 Observable.OnSubscribe
被激活时所处的线程。或者叫做事件产生的线程。 * observeOn()
: 指定Subscriber
所运行在的线程。或者叫做事件消费的线程。
//所以只要在刚才练习的过程中加上 //指定在线程池中发生 observable.subscribeOn(Schedulers.io()); //指定在主线程中消费 observable.observeOn(AndroidSchedulers.mainThread()); //这样Observable就会在线程池中运行,Observer在主线程中执行
六、变换
变换,就是将事件序列中的对象或整个序列进行加工处理,转换成不同的事件或事件序列
①、Observable.map() (解决一对一事件的转换)
Observable.just("images/logo.png") // 输入类型 String .map(new Func1<String, Bitmap>() { @Override public Bitmap call(String filePath) { // 参数类型 String return getBitmapFromPath(filePath); // 返回类型 Bitmap } }) .subscribe(new Action1<Bitmap>() { @Override public void call(Bitmap bitmap) { // 参数类型 Bitmap showBitmap(bitmap); } });
发现通过map()方法,将一个String类型的变量,转换成了Bitmap类型。之后再通过观察者执行。
知识:Func类:与Action类是一样的,但是Func类有返回值,但是Action类没有而已。
问题:map()只能接收一个对象(一对一的转化),当我有多个事件序列的时候,我该怎么办(就是有多个String 的时候),还有当我有一个student对象,该对象对应着多个课程(一对多的转换),我怎么将这么多的课程传给subscribe();
②、 flatMap()
(解决一对多事件的转换)
Student[] students = ...; Subscriber<Course> subscriber = new Subscriber<Course>() { @Override public void onNext(Course course) { Log.d(tag, course.getName()); } ... }; Observable.from(students) .flatMap(new Func1<Student, Observable<Course>>() { @Override public Observable<Course> call(Student student) { return Observable.from(student.getCourses());//①发现这里返回的是一个Observable } }) .subscribe(subscriber);
①、原理:flatMap()
中返回的是个 Observable
对象,并且这个 Observable
对象并不是被直接发送到了 Subscriber
的回调方法。而是将它激活,于是它开始发送事件,每一个被Observable创建出来的 发送的事件,都被汇入同一个 Observable
,而这个 Observable
负责将这些事件统一交给 Subscriber
的回调方法。
理论解析:把事件拆成了两级,通过一组新创建的 Observable
将初始的对象『铺平』之后通过统一路径分发了下去。
③、lift() (代理模式转换) (最好画图理解)
// 注意:这不是 lift() 的源码,而是将源码中与性能、兼容性、扩展性有关的代码剔除后的核心代码。 // 如果需要看源码,可以去 RxJava 的 GitHub 仓库下载。 public <R> Observable<R> lift(Operator<? extends R, ? super T> operator) { return Observable.create(new OnSubscribe<R>() { @Override public void call(Subscriber subscriber) { Subscriber newSubscriber = operator.call(subscriber); newSubscriber.onStart(); onSubscribe.call(newSubscriber); } }); }
发现,该代码返回了一个Obervable对象。原理:①、当调用lift()方法的时候,创建一个Observable,之后再调用subcribe()的时候,建立联系的Observer是放在新的Obserable上的,然后根据
observable.lift(new Observable.Operator<String, Integer>() { @Override public Subscriber<? super Integer> call(final Subscriber<? super String> subscriber) { // 将事件序列中的 Integer 对象转换为 String 对象 return new Subscriber<Integer>() { @Override public void onNext(Integer integer) { subscriber.onNext("" + integer); } @Override public void onCompleted() { subscriber.onCompleted(); } @Override public void onError(Throwable e) { subscriber.onError(e); } }; } });
发现在operator实现逻辑之后,才调用Observer,之后再创建一个Observer(就变成旧Obserable的Observer了),最后调用旧Obserable的call()方法。
这是一种代理模式,比如说onNext()方法,Obserable直接调用onNext(),中间出现一个代理,也是onNext()方法,其中实现了自己的逻辑,最后再调用之前的onNext()方法。
就相当于,同样的方法名,被加上了一层逻辑,但是之前的调用方式还是一样的。这个代理就是operate类。
注:代理机制常用于通过事件拦截和处理实现事件序列的变换。
个人感觉RXJAVA的强大之处:①、能够在一个界面实现异步加载。②、利用变形实现逻辑上嵌套的解耦
以上是关于RXJAVA的使用的主要内容,如果未能解决你的问题,请参考以下文章
使用 RxJava 将数据从 ViewModel 移动到 Fragment
如何在基础活动暂停时暂停rxjava Observable.interval