带你一起探究Rxjava源码,学会Rxjava竟如此简单

Posted 终端研发部

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了带你一起探究Rxjava源码,学会Rxjava竟如此简单相关的知识,希望对你有一定的参考价值。

http://www.jianshu.com/p/daa77f6acc25

声明原创|本文为孙凯授权发布,未经允许请勿转载

正文

为什么要用 RxJava

简洁!简洁!简洁!(重要的事情说三遍)

RxJava 最大的优点就是简洁。简洁的代码能让人心旷神怡,减少 bug 。

RxJava 是一种新的编程模式 响应式编程

响应式编程是一种基于异步数据流概念的编程模式。
数据流就像一条河:它可以被观测,被过滤,被操作,或者为新的消费者与另外一条流合并为一条新的流。

以上是 RxJava Essentials 中文翻译版 对响应式编程的介绍。使用 RxJava 可以让我们在 java 语言中体验什么是响应式编程。响应式编程有两个重要概念

1. 基于异步
2. 数据流

如何使用 RxJava

  1. RxJava 的设计理念基于 观察者模式

  2. 在 RxJava 中首先明白有两个对象观察者和被观察者。

  3. 观察者和被观察者可以存在不同的线程之中,所以存在观测线程和被观测线程

  4. 观察者和被观察者通过 subscribe 发生『订阅』关系。

RxJava 提供一些列的链式调用,使用起来如下:

 
   
   
 
Observable
   .create(...)
   .observeOn(...)
   .subscribeOn(...)
   .subscribe(...)

subscribe() 方法为链式调用的最后一层,create() 和 subscribe() 方式之前可以任意设置其他操作。

上面提到『响应式编程』中的数据流就像是一条河。
create() 方法可以比喻为河流的『上游发源地』,subscribe() 则为河流的『入海口』。
在这两个方法之间我们可以添加观察、过滤等操作。

在链式调用中增加一些数据处理

Observable
   .create(...)
   .observeOn(...)
   .subscribeOn(...)
   .map(...)
   .filter(...)
   .subscribe(...)

在 RxJava2 中提供一系列可观测对象(也就是上面链式调用的 Observable 等同功能)

  • io.reactivex.Flowable

  • io.reactivex.Observable

  • io.reactivex.Single

  • io.reactivex.Completable

  • io.reactivex.Maybe

这里我们写一个例子

Observable.create((ObservableOnSubscribe<String>) e -> {
       for (int i = 0; i < 5; i++) {
           e.onNext(i + "");
       }
   })
           .observeOn(Schedulers.io())
           .subscribeOn(Schedulers.io())
           .map(s -> {
               System.out.println("map:" + s);
               return s + "_map";
           })
           .filter(o -> {
               System.out.println("flat:" + o);
               if (o.compareTo("3") < 0) {
                   return true;
               }
               return false;
           })
           .subscribe(o -> System.out.println("subscribe:" + o));

输出结果如下:

map:0
flat:0_map
subscribe:0_map
map:1
flat:1_map
subscribe:1_map
map:2
flat:2_map
subscribe:2_map
map:3
flat:3_map
map:4
flat:4_map

上面的例子中,我们在被观察者中发射了 5 个数据源,观察者和被观察着都在同一个线程中,通过 map 对象给每个发射的对象拼接一个 『_map』字符串,通过 filter 过滤了比『3』字符串小的对象。

所以最终在观察者中接收到了 3 次消息。

源码分析 RxJava 中的核心代码

订阅关系的产生

创建被观察着

『被观测者』是事件产生的一方,创建方式也有很多种。这里列举一下 Observable 创建的方式

1.通过 create() 方法创建

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    ObjectHelper.requireNonNull(source, "source is null");
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

2.通过 just() 方法创建

public static <T> Observable<T> just(T item) {
    ObjectHelper.requireNonNull(item, "The item is null");
    return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
}

3.通过 fromArray() 方法创建

public static <T> Observable<T> fromArray(T... items) {
    ObjectHelper.requireNonNull(items, "items is null");
    if (items.length == 0) {
        return empty();
    } else
    if (items.length == 1) {
        return just(items[0]);
    }
    return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items));
}

其中 ObservableFromArray、ObservableJust、ObservableCreate 都是 Observable 的子类,而 Observable 本身是一个抽象。

这些子类主要实现 Observe 的抽象方法

protected abstract void subscribeActual(Observer<? super T> observer);

再看一下 RxJavaPlugins.onAssembly() 方法

public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
   Function<? super Observable, ? extends Observable> f = onObservableAssembly;
   if (f != null) {
       return apply(f, source);
   }
   return source;
}

这里需要说明一下 RxJavaPlugins.onAssembly()是一个 Hock,如果不做任何 hock 处理,RxJavaPlugins.onAssembly()会直接返回传入的对象。onObservableAssembly 静态成员变量为 null。

我们用 ObservableCreate 举例

ObservableCreate 的构造方法需要传入一个 ObservableOnSubscribe 对象并重载 Observable 的 subscribeActual()

protected void subscribeActual(Observer<? super T> observer) {
  CreateEmitterparent = new CreateEmitter(observer);
  observer.onSubscribe(parent);

try {
   source.subscribe(parent);
} catch (Throwable ex) {
   Exceptions.throwIfFatal(ex);
   parent.onError(ex);
}
}

subscribeActual() 方法传入了一个 Observer 对象并且包装到 CreateEmitter 对象中,然后调用
observer.onSubscribe(parent); 和 source.subscribe(parent);

创建观察者

观察者比较简单,需要实现 4 个方法

new Observer() {
  @Override
  public void onSubscribe(Disposable d) {

}

@Override
public void onNext(String s) {

}

@Override
public void onError(Throwable e) {

}

@Override
public void onComplete() {

}
}

订阅

通常情况下,我们不需要重载 Observer 的每一个方法,RxJava 内部提供了另一个 LambdaObserver 把 Observer 的四个方法拆分为 4 个部分。

1、Observable.subscribe() 可以只传入一个 Consumer 对象。

2、内部会把 Consumer 包裹在 LambdaObserver 中,并且返回 LambdaObserver

public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
   Action onComplete, Consumer<? super Disposable> onSubscribe)
{
ObjectHelper.requireNonNull(onNext, "onNext is null");
ObjectHelper.requireNonNull(onError, "onError is null");
ObjectHelper.requireNonNull(onComplete, "onComplete is null");
ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");

LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);

subscribe(ls);

return ls;
}

3、然后调用接收 Observer 的 subscribe() 方法

@Override
public final void subscribe(Observer<? super T> observer) {
  ObjectHelper.requireNonNull(observer, “observer is
null”);
 
try {
      observer = RxJavaPlugins.onSubscribe(
this, observer);

   ObjectHelper.requireNonNull(observer,
"Plugin returned null Observer");

   subscribeActual(observer);
} ……
}

4、这里调用了 Observable 的 subscribeActual(observer) 方法。
这里就完成了 观察者 和 被观察着 之间的订阅关系

带你一起探究Rxjava源码,学会Rxjava竟如此简单

如下一段代码

Observable.create(new ObservableOnSubscribe<String>() {
   @Override
   public void subscribe(ObservableEmitter<String> e) throws Exception {
       for (int i = 0; i < 5; i++) {
           System.out.println("subscribe:" + i);
           e.onNext(i + "");
       }
   }
})
       .subscribe(new Consumer<String>() {
           @Override
           public void accept(String s) throws Exception {
               System.out.println("accept:" + s);
           }
       });

调用的时序图如下

带你一起探究Rxjava源码,学会Rxjava竟如此简单

线程调度原理分析

上部分分析的订阅关系的创建,都是在当前线程之中。RxJava 可以指定 观察线程 和 观察者线程

observeOn 原理分析

Observable 的 observeOn 方法有三个

  1. Observable observeOn(Scheduler scheduler)

  2. Observable observeOn(Scheduler scheduler, boolean delayError)

  3. Observable observeOn(Scheduler scheduler, boolean delayError, int bufferSize)

这三个方法中,前两个方法都会再次调用第三个方法

public final ObservableobserveOn(Scheduler scheduler, boolean delayError, int bufferSize) {
  ObjectHelper.requireNonNull(scheduler, “scheduler is null”);
  ObjectHelper.verifyPositive(bufferSize, “bufferSize”);
  return RxJavaPlugins.onAssembly(new ObservableObserveOn(this, scheduler, delayError, bufferSize));
}

我们看到这里创建了一个 ObservableObserveOn 对象,ObservableObserveOn是 Observable的子类。

我们看下 ObservableObserveOn 的 subscribeActual

protected void subscribeActual(Observer<? super T> observer) {
  if (scheduler instanceof TrampolineScheduler) {
      source.subscribe(observer);
  } else {
      Scheduler.Worker w = scheduler.createWorker();

   source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}

1、这里我们假设传入的 Scheduler 是 Schedulers.io() 进一步跟踪分析会发现 Schedulers.io() 返回的是 ioscheduler 所以会走上面代码的 else 分支。

2、先忽略 scheduler.createWorker() 过程,先看下 ObserveOnObserver

这里的 source 是 ObservableCreate ,而 source.subscribe() 会调用 ObservableCreate.subscribeActual(observer) 然后调用 ObserveOnObserver.onSubscribe() 方法

@Override
public void onSubscribe(Disposable s) {
   if (DisposableHelper.validate(this.s, s)) {
       this.s = s;
       if (s instanceof QueueDisposable) {
           ……
       }
       queue = new SpscLinkedArrayQueue<T>(bufferSize);
       actual.onSubscribe(this);
   }
}

这里传入的 Disposable 是 CreateEmitter 对象,所以不会走 if 分支。

然后创建了一个 SpscLinkedArrayQueue 对象,

紧接着执行 actual.onSubscribe() 也就是 LambdaObserver.onSubscribe()

后面应该执行的是 ObserveOnObserver.onNext() 方法

public void onNext(T t) {
   if (done) {
       return;
   }

   if (sourceMode != QueueDisposable.ASYNC) {
       queue.offer(t);
   }
   schedule();
}

看到会把 onNext(T t) 传入参数放入队列之中,然后执行 schedule

void schedule() {
   if (getAndIncrement() == 0) {
       worker.schedule(this);
   }
}

worker.schedule() 接收的是一个 Runnable 对象,所以我们从这里可以看出 Observer 的 onNext() 、onComplete()、onError() 等方法都是在线程之中执行。

接下来我们看下线程的创建

从 ObservableObserveOn 的 subscribeActual() 方法中的

Scheduler.Worker w = scheduler.createWorker();
Schedulers.io()的跟踪过程比较简单,最终会得到一个 IoScheduler

public IoScheduler(ThreadFactory threadFactory) {
  this.threadFactory = threadFactory;
  this.pool = new AtomicReference(NONE);
  start();
}

直接看 createWorker()

public Worker createWorker() {
  return new EventLoopWorker(pool.get());
}

再看 EventLoopWorker 的构造函数

EventLoopWorker(CachedWorkerPool pool) {
   this.pool = pool;
   this.tasks = new CompositeDisposable();
   this.threadWorker = pool.get();
}

继续看下 CachedWorkerPool 的构造方法

CachedWorkerPool(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
   this.keepAliveTime = unit != null ? unit.toNanos(keepAliveTime) : 0L;
   this.expiringWorkerQueue = new ConcurrentLinkedQueue<ThreadWorker>();
   this.allWorkers = new CompositeDisposable();
   this.threadFactory = threadFactory;

   ScheduledExecutorService evictor = null;
   Future<?> task = null;
   if (unit != null) {
       evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);
       task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS);
   }
   evictorService = evictor;
   evictorTask = task;
}

终于我们找到了线程池相关的代码

evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);
task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS);

pool.get() 方法

ThreadWorker get() {
   if (allWorkers.isDisposed()) {
       return SHUTDOWN_THREAD_WORKER;
   }
   while (!expiringWorkerQueue.isEmpty()) {
       ThreadWorker threadWorker = expiringWorkerQueue.poll();
       if (threadWorker != null) {
           return threadWorker;
       }
   }

   // No cached worker found, so create a new one.
   ThreadWorker w = new ThreadWorker(threadFactory);
   allWorkers.add(w);
   return w;
}

这里返回了一个 ThreadWorker

看下 ThreadWorker 的创建过程

ThreadWorker(ThreadFactory threadFactory) {
   super(threadFactory);
   this.expirationTime = 0L;
}

看下父类的构造函数

public NewThreadWorker(ThreadFactory threadFactory) {
  executor = SchedulerPoolFactory.create(threadFactory);
}

再跟下去

public static ScheduledExecutorService create(ThreadFactory factory) {
  final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
  if (exec instanceof ScheduledThreadPoolExecutor) {
      ScheduledThreadPoolExecutor e = (ScheduledThreadPoolExecutor) exec;
      POOLS.put(e, exec);
  }
  return exec;
}

这里又出现了一个线程池

下面开始看 worker.schedule(this)

public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
   if (tasks.isDisposed()) {
       // don't schedule, we are unsubscribed
       return EmptyDisposable.INSTANCE;
   }

   return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}

继续跟踪下去

public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
  Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

if (parent != null) {
   if (!parent.add(sr)) {
       return sr;
   }
}

Future<?> f;
try {
   if (delayTime <= 0) {
       f = executor.submit((Callable<Object>)sr);
   } else {
       f = executor.schedule((Callable<Object>)sr, delayTime, unit);
   }
   sr.setFuture(f);
} catch (RejectedExecutionException ex) {
   if (parent != null) {
       parent.remove(sr);
   }
   RxJavaPlugins.onError(ex);
}

return sr;
}

终于我们找到了执行线程的方法
总结一下真个流程

带你一起探究Rxjava源码,学会Rxjava竟如此简单

由于篇幅较长,请到阅读原文进行学习。

阅读更多


相信自己,没有做不到的,只有想不到的

在这里获得的不仅仅是技术!


日   更   精   彩

微  信  号:codeGoogler

终端研发部


如果你觉得此文对您有所帮助,可以转发给身边的朋友,一起做一个乐于分享的小猿猿


            

这里学到不仅仅是技术

以上是关于带你一起探究Rxjava源码,学会Rxjava竟如此简单的主要内容,如果未能解决你的问题,请参考以下文章

Rxjava 源码解析 - subscribe源码

Rxjava源码分析&实践RxJava基本原理分析之订阅流

Rxjava 源码解析 - subscribe源码

Carson带你学Android:手把手带你入门神秘的Rxjava

一文带你全面了解RxJava

带你走通 OkHttp+Retrofit+Rxjava