带你一起探究Rxjava源码,学会Rxjava竟如此简单
Posted 终端研发部
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了带你一起探究Rxjava源码,学会Rxjava竟如此简单相关的知识,希望对你有一定的参考价值。
http://www.jianshu.com/p/daa77f6acc25
声明原创|本文为孙凯授权发布,未经允许请勿转载
为什么要用 RxJava
简洁!简洁!简洁!(重要的事情说三遍)
RxJava 最大的优点就是简洁。简洁的代码能让人心旷神怡,减少 bug 。
RxJava 是一种新的编程模式 响应式编程
响应式编程是一种基于异步数据流概念的编程模式。
数据流就像一条河:它可以被观测,被过滤,被操作,或者为新的消费者与另外一条流合并为一条新的流。
以上是 RxJava Essentials 中文翻译版 对响应式编程的介绍。使用 RxJava 可以让我们在 java 语言中体验什么是响应式编程。响应式编程有两个重要概念
1. 基于异步
2. 数据流
如何使用 RxJava
RxJava 的设计理念基于 观察者模式
在 RxJava 中首先明白有两个对象观察者和被观察者。
观察者和被观察者可以存在不同的线程之中,所以存在观测线程和被观测线程
观察者和被观察者通过 subscribe 发生『订阅』关系。
RxJava 提供一些列的链式调用,使用起来如下:
Observable
.create(...)
.observeOn(...)
.subscribeOn(...)
.subscribe(...)
subscribe() 方法为链式调用的最后一层,create() 和 subscribe() 方式之前可以任意设置其他操作。
上面提到『响应式编程』中的数据流就像是一条河。
create() 方法可以比喻为河流的『上游发源地』,subscribe() 则为河流的『入海口』。
在这两个方法之间我们可以添加观察、过滤等操作。
在链式调用中增加一些数据处理
Observable
.create(...)
.observeOn(...)
.subscribeOn(...)
.map(...)
.filter(...)
.subscribe(...)
在 RxJava2 中提供一系列可观测对象(也就是上面链式调用的 Observable 等同功能)
io.reactivex.Flowable
io.reactivex.Observable
io.reactivex.Single
io.reactivex.Completable
io.reactivex.Maybe
这里我们写一个例子
Observable.create((ObservableOnSubscribe<String>) e -> {
for (int i = 0; i < 5; i++) {
e.onNext(i + "");
}
})
.observeOn(Schedulers.io())
.subscribeOn(Schedulers.io())
.map(s -> {
System.out.println("map:" + s);
return s + "_map";
})
.filter(o -> {
System.out.println("flat:" + o);
if (o.compareTo("3") < 0) {
return true;
}
return false;
})
.subscribe(o -> System.out.println("subscribe:" + o));
输出结果如下:
map:0
flat:0_map
subscribe:0_map
map:1
flat:1_map
subscribe:1_map
map:2
flat:2_map
subscribe:2_map
map:3
flat:3_map
map:4
flat:4_map
上面的例子中,我们在被观察者中发射了 5 个数据源,观察者和被观察着都在同一个线程中,通过 map 对象给每个发射的对象拼接一个 『_map』字符串,通过 filter 过滤了比『3』字符串小的对象。
所以最终在观察者中接收到了 3 次消息。
源码分析 RxJava 中的核心代码
订阅关系的产生
创建被观察着
『被观测者』是事件产生的一方,创建方式也有很多种。这里列举一下 Observable 创建的方式
1.通过 create() 方法创建
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
2.通过 just() 方法创建
public static <T> Observable<T> just(T item) {
ObjectHelper.requireNonNull(item, "The item is null");
return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
}
3.通过 fromArray() 方法创建
public static <T> Observable<T> fromArray(T... items) {
ObjectHelper.requireNonNull(items, "items is null");
if (items.length == 0) {
return empty();
} else
if (items.length == 1) {
return just(items[0]);
}
return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items));
}
其中 ObservableFromArray、ObservableJust、ObservableCreate 都是 Observable 的子类,而 Observable 本身是一个抽象。
这些子类主要实现 Observe 的抽象方法
protected abstract void subscribeActual(Observer<? super T> observer);
再看一下 RxJavaPlugins.onAssembly() 方法
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
这里需要说明一下 RxJavaPlugins.onAssembly()是一个 Hock,如果不做任何 hock 处理,RxJavaPlugins.onAssembly()会直接返回传入的对象。onObservableAssembly 静态成员变量为 null。
我们用 ObservableCreate 举例
ObservableCreate 的构造方法需要传入一个 ObservableOnSubscribe 对象并重载 Observable 的 subscribeActual()
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitterparent = new CreateEmitter(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
subscribeActual() 方法传入了一个 Observer 对象并且包装到 CreateEmitter 对象中,然后调用
observer.onSubscribe(parent); 和 source.subscribe(parent);
创建观察者
观察者比较简单,需要实现 4 个方法
new Observer() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
}
订阅
通常情况下,我们不需要重载 Observer 的每一个方法,RxJava 内部提供了另一个 LambdaObserver 把 Observer 的四个方法拆分为 4 个部分。
1、Observable.subscribe() 可以只传入一个 Consumer 对象。
2、内部会把 Consumer 包裹在 LambdaObserver 中,并且返回 LambdaObserver
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete, Consumer<? super Disposable> onSubscribe) {
ObjectHelper.requireNonNull(onNext, "onNext is null");
ObjectHelper.requireNonNull(onError, "onError is null");
ObjectHelper.requireNonNull(onComplete, "onComplete is null");
ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");
LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);
subscribe(ls);
return ls;
}
3、然后调用接收 Observer 的 subscribe() 方法
@Override
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, “observer is null”);
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
} ……
}
4、这里调用了 Observable 的 subscribeActual(observer) 方法。
这里就完成了 观察者 和 被观察着 之间的订阅关系
如下一段代码
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
for (int i = 0; i < 5; i++) {
System.out.println("subscribe:" + i);
e.onNext(i + "");
}
}
})
.subscribe(new Consumer<String>() {
@Override
public void accept(String s) throws Exception {
System.out.println("accept:" + s);
}
});
调用的时序图如下
线程调度原理分析
上部分分析的订阅关系的创建,都是在当前线程之中。RxJava 可以指定 观察线程 和 观察者线程
observeOn 原理分析
Observable 的 observeOn 方法有三个
Observable
observeOn(Scheduler scheduler) Observable observeOn(Scheduler scheduler, boolean delayError) Observable observeOn(Scheduler scheduler, boolean delayError, int bufferSize)
public final ObservableobserveOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, “scheduler is null”);
ObjectHelper.verifyPositive(bufferSize, “bufferSize”);
return RxJavaPlugins.onAssembly(new ObservableObserveOn(this, scheduler, delayError, bufferSize));
}
我们看下 ObservableObserveOn 的 subscribeActual
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
1、这里我们假设传入的 Scheduler 是 Schedulers.io() 进一步跟踪分析会发现 Schedulers.io() 返回的是 ioscheduler 所以会走上面代码的 else 分支。
2、先忽略 scheduler.createWorker() 过程,先看下 ObserveOnObserver
这里的 source 是 ObservableCreate ,而 source.subscribe() 会调用 ObservableCreate.subscribeActual(observer) 然后调用 ObserveOnObserver.onSubscribe() 方法
@Override
public void onSubscribe(Disposable s) {
if (DisposableHelper.validate(this.s, s)) {
this.s = s;
if (s instanceof QueueDisposable) {
……
}
queue = new SpscLinkedArrayQueue<T>(bufferSize);
actual.onSubscribe(this);
}
}
这里传入的 Disposable 是 CreateEmitter 对象,所以不会走 if 分支。
然后创建了一个 SpscLinkedArrayQueue 对象,
紧接着执行 actual.onSubscribe() 也就是 LambdaObserver.onSubscribe()
后面应该执行的是 ObserveOnObserver.onNext() 方法
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
看到会把 onNext(T t) 传入参数放入队列之中,然后执行 schedule
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
worker.schedule() 接收的是一个 Runnable 对象,所以我们从这里可以看出 Observer 的 onNext() 、onComplete()、onError() 等方法都是在线程之中执行。
接下来我们看下线程的创建
从 ObservableObserveOn 的 subscribeActual() 方法中的
Scheduler.Worker w = scheduler.createWorker();
Schedulers.io()的跟踪过程比较简单,最终会得到一个 IoScheduler
public IoScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
this.pool = new AtomicReference(NONE);
start();
}
public Worker createWorker() {
return new EventLoopWorker(pool.get());
}
再看 EventLoopWorker 的构造函数
EventLoopWorker(CachedWorkerPool pool) {
this.pool = pool;
this.tasks = new CompositeDisposable();
this.threadWorker = pool.get();
}
继续看下 CachedWorkerPool 的构造方法
CachedWorkerPool(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
this.keepAliveTime = unit != null ? unit.toNanos(keepAliveTime) : 0L;
this.expiringWorkerQueue = new ConcurrentLinkedQueue<ThreadWorker>();
this.allWorkers = new CompositeDisposable();
this.threadFactory = threadFactory;
ScheduledExecutorService evictor = null;
Future<?> task = null;
if (unit != null) {
evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);
task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS);
}
evictorService = evictor;
evictorTask = task;
}
终于我们找到了线程池相关的代码
evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);
task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS);
pool.get() 方法
ThreadWorker get() {
if (allWorkers.isDisposed()) {
return SHUTDOWN_THREAD_WORKER;
}
while (!expiringWorkerQueue.isEmpty()) {
ThreadWorker threadWorker = expiringWorkerQueue.poll();
if (threadWorker != null) {
return threadWorker;
}
}
// No cached worker found, so create a new one.
ThreadWorker w = new ThreadWorker(threadFactory);
allWorkers.add(w);
return w;
}
这里返回了一个 ThreadWorker
看下 ThreadWorker 的创建过程
ThreadWorker(ThreadFactory threadFactory) {
super(threadFactory);
this.expirationTime = 0L;
}
看下父类的构造函数
public NewThreadWorker(ThreadFactory threadFactory) {
executor = SchedulerPoolFactory.create(threadFactory);
}
再跟下去
public static ScheduledExecutorService create(ThreadFactory factory) {
final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
if (exec instanceof ScheduledThreadPoolExecutor) {
ScheduledThreadPoolExecutor e = (ScheduledThreadPoolExecutor) exec;
POOLS.put(e, exec);
}
return exec;
}
这里又出现了一个线程池
下面开始看 worker.schedule(this)
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
}
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
继续跟踪下去
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try {
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
终于我们找到了执行线程的方法
总结一下真个流程
由于篇幅较长,请到阅读原文进行学习。
阅读更多
相信自己,没有做不到的,只有想不到的
在这里获得的不仅仅是技术!
日 更 精 彩
微 信 号:codeGoogler
—终端研发部—
如果你觉得此文对您有所帮助,可以转发给身边的朋友,一起做一个乐于分享的小猿猿
这里学到不仅仅是技术
以上是关于带你一起探究Rxjava源码,学会Rxjava竟如此简单的主要内容,如果未能解决你的问题,请参考以下文章