RxJava学习笔记---简单使用
Posted 逆水当行舟
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RxJava学习笔记---简单使用相关的知识,希望对你有一定的参考价值。
如果觉得一篇文章写得好,不要放到收藏夹里面,马上把它看完,如果两天内还没开始看,那就可以删掉了
如果觉得一样技术很好,那就马上去学,不要拖延,不要找借口。如果你一周内还没开始行动,还不如坦荡点放弃
恰如克林克兹所说:
与其感叹路难行,不如马上出发
去年就在看RxJava,每次看一点点,遇到障碍,感叹要理解的东西太麻烦就放弃了。今年决心要搞定它,学习的过程注定孤独单调,恰好现在开始写Blog,就顺便记录下学习笔记,以此自勉。在后面的2-4周计划:
- 明白RxJava的基本用法,API中常用类、函数、变换
- 学习使用RxJava写的开源App,提升熟练度
- 研究RxJava内部实现,研究他为什么要如此设计API,如此好用
- 用RxJava写一个App,开源
RxJava是什么
Rx是什么
Rx全称Reactive Extensions,译为响应式拓展。
微软最先提出这个概念,借用MSDN上的定义:Reactive Extensions(Rx)是一个类库,它集成了异步、基于可观察(observable)序列的事件驱动编程和LINQ-style的查询操作,使用Rx,开发人员
- 可以用observable对象描述异步数据流
- 使用LINQ操作符异步查询数据
- 使用Schedulers控制异步过程中的并发
简而言之
Rx = Observables + LINQ + Schedulers
Rx已经渗透到各个语言中,于是有了:
RxJava、RxJS、Rx.NET、UniRx、RxScala、RxCpp、 RxSwift、Rxphp、Ruby: Rx.rb、Python: RxPY等等
基于frameworks上还有
RxNetty、Rxandroid、RxCocoa
RxJava的定义
RxJava是Rx在Java语言上的扩展。
RxJava
– Reactive Extensions for the JVM – a library for composing asynchronous and event-based programs using observable sequences for the Java VM.
译为:一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库
RxJava怎么用
在build.gradle
—Module文件中导入
compile 'io.reactivex:rxandroid:1.2.1'
compile 'io.reactivex:rxjava:1.1.6'
RxJava观察者模式
RxJava观察者有4个基本概念组成:
- Observer 观察者
- Observable 被观察者
- subscribe() 订阅
- 事件
创建Observer
Observer译为观察者,直接创建实现
Observer<String> observer = new Observer<String>()
@Override
public void onCompleted()
Log.d(TAG, "onCompleted: ");
@Override
public void onError(Throwable e)
Log.d(TAG, "onError: " + e.toString());
@Override
public void onNext(String s)
Log.d(TAG, "onNext: " + s);
;
对比常用的给控件注册一个监听事件
class MyListener extends View.OnClickListener
@Override
public void onClick(View v)
Log.d(TAG, "onClick: ");
这里的RxJava的onNext()
方法就对应MyListener的onClick()
方法,不过比起后者Observer还多了onCompleted()
和onError()
onNext()
事件响应onCompleted()
事件队列结束。不再有新的onNext()
发出时,需要触发其作为结束。onError()
事件队列异常。事件队列处理中若出现异常,onError
触发,队列终止。
一个事件队列中,Error
和Completed
有且只有一个,并且是事件队列的最后一个。
创建Subscriber
Subscriber<String> subscriber = new Subscriber<String>()
@Override
public void onCompleted()
Log.d(TAG, "onCompleted: ");
@Override
public void onError(Throwable e)
Log.d(TAG, "onError: " + e.toString());
@Override
public void onNext(String s)
Log.d(TAG, "onNext: " + s);
;
基本使用的话,Subscriber
和Observer
使用完全一致,实际上在RxJava的subscribe
过程中,Observer
也常常转换为Subscriber
再使用。但是Subscriber
是Oberver
的功能升级
查看其实现代码
public abstract class Subscriber<T> implements Observer<T>, Subscription
....
- 我们知道,Subscriber是一个抽象类,所以我们刚才实现的时候需要实现
onCompleted
、onError
、onNext
三个方法 - 实现
Observer<T>
接口 - 实现
Subscription
接口
Observer<T>
接口的实现表明Observer
能实现的东西他都可以实现
public interface Observer<T>
void onCompleted();
void onError(Throwable e);
void onNext(T t);
那么Subscription
接口实现呢
public interface Subscription
void unsubscribe();
boolean isUnsubscribed();
顾名思义,unsubscribe()
表示反注册;isUnsubscribed()
表示是否已经注册,反注册和是否已经注册出现了,哪注册呢,在这里
private final SubscriptionList subscriptions;
public final void add(Subscription s)
subscriptions.add(s);
@Override
public final void unsubscribe()
subscriptions.unsubscribe();
@Override
public final boolean isUnsubscribed()
return subscriptions.isUnsubscribed();
add()
就是注册。通过SubscriptionList subscriptions
方法转发实现,我们看看SubscriptionList
是什么
public final class SubscriptionList implements Subscription
private LinkedList<Subscription> subscriptions;
private volatile boolean unsubscribed;
public SubscriptionList(Subscription s)
this.subscriptions = new LinkedList<Subscription>();
this.subscriptions.add(s);
@Override
public boolean isUnsubscribed()
return unsubscribed;
public void add(final Subscription s)
if (s.isUnsubscribed())
return;
if (!unsubscribed)
synchronized (this)
if (!unsubscribed)
LinkedList<Subscription> subs = subscriptions;
if (subs == null)
subs = new LinkedList<Subscription>();
subscriptions = subs;
subs.add(s);
return;
s.unsubscribe();
@Override
public void unsubscribe()
if (!unsubscribed)
List<Subscription> list;
synchronized (this)
if (unsubscribed)
return;
unsubscribed = true;
list = subscriptions;
subscriptions = null;
unsubscribeFromAll(list);
private static void unsubscribeFromAll(Collection<Subscription> subscriptions)
if (subscriptions == null)
return;
List<Throwable> es = null;
for (Subscription s : subscriptions)
try
s.unsubscribe();
catch (Throwable e)
if (es == null)
es = new ArrayList<Throwable>();
es.add(e);
Exceptions.throwIfAny(es);
看到开始定义的
private LinkedList<Subscription> subscriptions;
private volatile boolean unsubscribed;
大概就能猜出来,就是通过链表来操作的,具体不再深究。回到主题比起Observer
来说Subsriber
大概有以下几个优点
- 可以使用
unsubscribe()
取消订阅。有什么好处呢?其实和我们使用广播的时候,注册和反注册是一样的道理,防止内存泄漏。 - 可以通过
isUnsubscribed()
获取当前事件是否注册,封装好的方法可以更容易去判断当前的事件注册与否的状态。 onStart()
可以在Subscriber
刚开始的时候做一些准备工作。我们看到他是一个空实现
public void onStart()
// do nothing by default
就如同在使用Volley或者okhttp的时候也有这种同名方法,做一些初始化操作在请求网络最开始的部分。
创建Observable
Observable.create
Observable observable = Observable.create(new Observable.OnSubscribe()
@Override
public void call(Object o)
subscriber.onNext("1");
subscriber.onNext("2");
subscriber.onNext("3");
subscriber.onCompleted();
);
储存一个OnSubscribe
对象作为计划表,一旦Observable
被订阅,call()
方法就会被调用,然后按照对onNext()
和onCompleted()
设定的顺序依次执行。
Observable.just
上面的例子可以等价于
Observable observableJust = Observable.just("1", "2", "3");
just—将参数依次发出去
Observable.from
也等价于这种写法
String[] tests = "1", "2", "3";
Observable observableFrom = Observable.from(tests);
from—顾名思义从什么地方来
运行
使用
observable.subscribe(subscriber);
observable.subscribe(observer);
事件链就可以跑起来了。
这里subscribe()
方法通过下面的方法转发
static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable)
//...省略部分代码
//执行onStart初始方法
subscriber.onStart();
//执行Subscriber预设的call()方法
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
//返回传入的subscriber
return hook.onSubscribeReturn(subscriber);
打印数组
把数组内的值依次打印出来
private static final Integer[] INT_ARRAYS = 1, 2, 3, 4, 5;
Observable.from(INT_ARRAYS).subscribe(new Action1<Integer>()
@Override
public void call(Integer integer)
Log.d(TAG, "call: " + integer);
);
显示图片
把mipmap文件里的图片显示到界面
private ImageView ivShow;
@Override
protected void onCreate(Bundle savedInstanceState)
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_main);
ivShow = (ImageView) findViewById(R.id.ivShow);
Observable.create(new Observable.OnSubscribe<Drawable>()
@Override
public void call(Subscriber<? super Drawable> subscriber)
Drawable drawable = getTheme().getDrawable(R.mipmap.bg_splash);
subscriber.onNext(drawable);
subscriber.onCompleted();
).subscribe(new Observer<Drawable>()
@Override
public void onCompleted()
@Override
public void onError(Throwable e)
Toast.makeText(MainActivity.this, e.toString(), Toast.LENGTH_SHORT).show();
@Override
public void onNext(Drawable drawable)
ivShow.setImageDrawable(drawable);
);
但是上面的实现没有任何意义,因为在同一个线程里面使用还不如直接调用。RxJava最大的特性是前台回调,后台处理,是伴随着多线程应运而生的。
Scheduler
//直接运行在当前线程
Schedulers.immediate();
//开启一个新线程执行任务
//内部有一个没有上限的线程池,可以重用空闲线程
//相当于 ExecutorService executorService = Executors.newCachedThreadPool();
Schedulers.io();
//有一个固定线程池,大小为CPU核心数+1
//等价于 Executors.newFixedThreadPool(nCore+1)
Schedulers.computation();
刚才的例子:
被创建的数组值在IO线程发出,在主线程处理打印
Observable.from(INT_ARRAYS)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<Integer>()
@Override
public void call(Integer integer)
Log.d(TAG, "call: " + integer);
);
在IO线程加载图片,在主线程显示图片
Observable
.create(new Observable.OnSubscribe<Drawable>()
@Override
public void call(Subscriber<? super Drawable> subscriber)
Drawable drawable = getTheme().getDrawable(R.mipmap.bg_splash);
subscriber.onNext(drawable);
subscriber.onCompleted();
)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Drawable>()
@Override
public void onCompleted()
@Override
public void onError(Throwable e)
Toast.makeText(MainActivity.this, e.toString(), Toast.LENGTH_SHORT).show();
@Override
public void onNext(Drawable drawable)
ivShow.setImageDrawable(drawable);
);
以上是关于RxJava学习笔记---简单使用的主要内容,如果未能解决你的问题,请参考以下文章