RxJava1 学习笔记
Posted 若兰明月
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RxJava1 学习笔记相关的知识,希望对你有一定的参考价值。
RxJava1 学习笔记
标签(空格分隔): 开源项目 android开发
观察者模式UML类图
观察者场景
- 1、一个方面的操作依赖另一个方面的状态 变化
- 2、如果在更改一个对象的时候,需要同时连带改变其他的对象
- 3、当一个对象必须通知其他的对象,但是你又希望这个对象和其他被通知的对象是松散耦合的
RxJava四要素
- 1、被观察者
- 2、观察者
- 3、订阅(被观察者和观察者之间的契约)
- 4、事件
事件
- 响应式编程
- 生活中的对应
//第一步:创建被观察者:create
Observable observable = Observable.create(new Observable.OnSubscribe<String>()
@Override
public void call(Subscriber<? super String> subscriber)
subscriber.onNext("Hello");
subscriber.onNext("Imooc");
subscriber.onCompleted();
);
//通过just方法来创建被观察者
Observable observableJust = Observable.just("Hello", "Imooc");
//通过from方法来创建被观察者
String[] parameters = "hello", "Imooc";
Observable observableForm = Observable.from(parameters);
//第二步:创建观察者
Observer<Object> observer = new Observer<Object>()
@Override
public void onCompleted()
@Override
public void onError(Throwable e)
@Override
public void onNext(Object o)
;
public void doRxJava()
//第三步:订阅
observable.subscribe(observer);
//也可以下方的方式创建
Observable.create(new Observable.OnSubscribe<String>()
@Override
public void call(Subscriber<? super String> subscriber)
).subscribe(new Observer<String>()
@Override
public void onCompleted()
@Override
public void onError(Throwable e)
@Override
public void onNext(String s)
);
三个关键对象和一个核心的方法
- 1、Observable(被观察者)
- 2、OnSubscribe
- Observable(被观察者)内部的一个对象,可以看成被观察者通知观察者的nofity()方法。
- 3、Subscriber(观察者)
- 4、subscribe()
- 订阅方法 完成被观察者和观察者的订阅
第一步,创建Observable,通过Observable.create()方法,当然create()里面的参数就是OnSubscribe参数
这个参数通过RxJavahooks.onCreate() //这个RxJavahooks是1.3版本之后加入的(类似一个抽象的代理类) 最终还是转换成Onservable这个类型
做完这方法之后,就可以对Onservable立面的onSubscribe这个参数进行赋值了
protected Observable(OnSubscribe<T> f)
this.onSubscribe = f;
第二步就是创建观察者
//第二步:创建观察者
Observer<Object> observer = new Observer<Object>()
@Override
public void onCompleted()
@Override
public void onError(Throwable e)
@Override
public void onNext(Object o)
;
Subscriber<String> subscriber = new Subscriber<String>()
@Override
public void onCompleted()
@Override
public void onError(Throwable e)
@Override
public void onNext(String s)
;
可以看到有两种方式创建观察者,一种是直接创建Observer,另一种就是创建Subscriber,其中Subscriber是实现了Observer这个接口,不过即使使用Observer创建观察者,最终内部还是把他转换成Subscriber这个。(下方这个就是Observer被订阅的方法)
public final Subscription subscribe(final Observer<? super T> observer)
//可以看到这个地方 如果是匹配成Subscriber 那么进行subscribe()这个方法的传递
if (observer instanceof Subscriber)
return subscribe((Subscriber<? super T>)observer);
if (observer == null)
throw new NullPointerException("observer is null");
//如果 不满足以上的 那么就直接返回
return subscribe(new ObserverSubscriber<T>(observer));
Subscriber
//实现了Observer和Subscription这两个接口
abstract class Subscriber<T> implements Observer<T>, Subscription
//该类里面有
private final SubscriptionList subscriptions; 也就是各种订阅事件集合,当我们取消订阅的时候会调用Subscription里面的unsubscribe()方法去进行订阅取消。
if (!unsubscribed)
List<Subscription> list;
synchronized (this)
if (unsubscribed)
return;
unsubscribed = true;
list = subscriptions;
subscriptions = null; //置空
// we will only get here once
unsubscribeFromAll(list);
进行订阅subscribe(Subscriber
static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable)
// validate and proceed 进行验证
if (subscriber == null)
throw new IllegalArgumentException("subscriber can not be null");
if (observable.onSubscribe == null)
throw new IllegalStateException("onSubscribe function can not be null.");
/*
* the subscribe function can also be overridden but generally that's not the appropriate approach
* so I won't mention that in the exception
*/
// new Subscriber so onStart it 这个是一个空方法 我们可以在进行订阅之前进行相关操作 例如loading()
subscriber.onStart();
/*
* See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls
* to user code from within an Observer"
*/
// if not already wrapped 对subscriber进行非SafeSubscriber的处理 SafeSubscriber就是对Subscriber进行了一系列的处理(包装) 更加安全
if (!(subscriber instanceof SafeSubscriber))
// assign to `observer` so we return the protected version
subscriber = new SafeSubscriber<T>(subscriber);
// The code below is exactly the same an unsafeSubscribe but not used because it would
// add a significant depth to already huge call stacks.
try
// allow the hook to intercept and/or decorate
//调用call()方法 进行订阅
RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
//然后进行Subscription的返回 返回类型是Subscription 我们知道这个Subscription是每一个观察者必须实现的一个接口,也就是说我们完成一次订阅 就会在Subsriptionlist里面进行增加一个观察者,当我们需要取消某个订阅的时候 就可以采用unscribered()方法进行取消需要取消的那个订阅
return RxJavaHooks.onObservableReturn(subscriber);
catch (Throwable e)
// special handling for certain Throwable/Error/Exception types
Exceptions.throwIfFatal(e);
// in case the subscriber can't listen to exceptions anymore
if (subscriber.isUnsubscribed())
RxJavaHooks.onError(RxJavaHooks.onObservableError(e));
else
// if an unhandled error occurs executing the onSubscribe we will propagate it
try
subscriber.onError(RxJavaHooks.onObservableError(e));
catch (Throwable e2)
Exceptions.throwIfFatal(e2);
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
// so we are unable to propagate the error correctly and will just throw
RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
// TODO could the hook be the cause of the error in the on error handling.
RxJavaHooks.onObservableError(r);
// TODO why aren't we throwing the hook's return value.
throw r; // NOPMD
return Subscriptions.unsubscribed();
RxJava的相关操作符
变换操作符
- 就是将事件序列中的对象或者整个序列进行加工处理
Map操作符
- 就是把一个事件转换成另一个事件
- map()函数接收一个Func1类型的参数,然后把这个Func1应用到每一个由Observable发射的值上,将发射的值转换为我们期望的值。
Observable
// just()方法return ScalarSynchronousObservable.create(value); 而ScalarSynchronousObservable是Observable的一个子类 所以也是一个Observable 可以进行链式调用的基础
.just("map/iamge/image.png")
//这里的map里面的Func1的这个函数接收两个参数
//第一个参数就是Observable传递的String类型的参数
//第二个参数是我们需要转换成的参数 这里是Bitmap
.map(new Func1<String, Bitmap>()
@Override
public Bitmap call(String filePath)
return getBitmap(filePath);
)
//subscribe这个是完成订阅的核心之一
.subscribe(new Action1<Bitmap>()
@Override
public void call(Bitmap bitmap)
showBitmap(bitmap);
);
flatMap操作符
- 1、将传入的事件对象转换成一个Observable对象
- 2、这是不会直接发送这个Observable,而是将这个Observable激活让它自己开始发送事件
3、每一个创建出来的Observable发送的事件,都会汇入同一个Observable
public Subscription flatMap() return Observable.just("dddd", "ddddd", "dadafadf") //传入的参数类型 //第一个参数是String类型 也就是上面just里面的参数类型的item类型 //第二个类型则是Observable类型 .flatMap(new Func1<String, Observable<String>>() @Override public Observable<String> call(String s) return createIvservable(s); ).subscribe(new Action1<String>() @Override public void call(String s) );
RxJava线程控制
Android多线程编程原则
- 第一、不要阻塞UI线程
- 第二、不要在UI线程之外访问UI组件
Android多线程
- 解决
- handler / asynctask
- rxjava:线程控制
RxJava遵循线程不变原则
线程控制符
- Schedulers.immediate()
- 默认不切换操作
- Schedulers.newThread()
- 总是去切换线程,默认在新线程里面进行操作
- Schedulers.io()
- 读取文件,io操作 类似第二个newThread()这个方法,但是io()里面设定有一个无数量上限的线程池,可以复用更多,在大多数情况下,这个io()方法比我们调用newThread()方法效率更高
Schedulers.computation()
- 主要用于计算使用
AndroidSchedulers.mainThread()
- 切换到主线程
RxJava如何进行线程控制
- 1、subscribeOn()
- 指定我们的subscribe ,也就是我们订阅观察者的时候所发生的线程,也就是我们的Observable里面的onSubscribe被激活的时候所处的线程。
- 2、observeOn()
- 指定我们的subscribe所运行所在的线程,或者称为事件消费所在的线程。
observeOn()指定的是他之后的操作所在的线程,observeOn()是支持多次调用的。subscribeOn()则是只能调用一次
SubscribeOn方法
- 1、会生成一个新的Observable
- 2、onSubscribe会在目标Subscriber订阅的时候使用传入的Scheduler的worker作为线程调度执行者
- 3、在对应的线程中通知原始Observable发送消息给这个过程中临时生成的Subscriber
- 4、这个Subscriber又会通知到目标Subscriber,从而完成我们的subscribeOn的过程
ObserveOn和subscribeOn方法
- subscribeOn是通过新建Observable的方式,使用OnSubscribe类的方式去做到线程切换的
- observeOn是通过operator操作符的形式去完成线程切换的,所以他的作用域和其他操作符一样,是调用observeOn之后的链路
- observeOn()指定的是它之后的操作所在的线程,通过observeOn()的多次调用,程序实现了线程的多次切换
- subscribeOn()的位置放在哪里都可以,但是他只能调用一次,原因就是subscribeOn是通过新建的Observable的方式
以上是关于RxJava1 学习笔记的主要内容,如果未能解决你的问题,请参考以下文章