RxJava之七——RxJava 2.0 图文分析create() subscribe()map()observeOn()subscribeOn()源码
Posted 薛瑄
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RxJava之七——RxJava 2.0 图文分析create() subscribe()map()observeOn()subscribeOn()源码相关的知识,希望对你有一定的参考价值。
前言
16年 的时候写过两篇关于Rxjava 1.0 的源码分析,时过境迁,现在早已是2.0 了。2.0 的代码逻辑,封装,更为易懂,也包含了 一些新特性背压,面向切面等等。所以决定,写篇文章分析RxJava 2.0
关于RxJava,从表面上看起来很容易使用,但是如果理解不够深刻,使用过程中,往往会出现一些问题,所以我写了系列文章,从入门到精通,从简单的使用到部分源码详解,希望能给读者一个质的飞跃:
1、RxJava之一——一次性学会使用RxJava RxJava简单的使用和使用它的好处
2、RxJava之二——Single和Subject 与Observable举足轻重的类,虽然用的少,但应该知道
3、RxJava之三——RxJava 2.0 全部操作符示例
4、RxJava之四—— Lift()详解 想要了解Operators,Lift()一定要学习
5、RxJava之五—— observeOn()与subscribeOn()的详解Scheduler线程切换的原理
6、RxJava之六——RxBus 通过RxJava来替换EventBus
7、RxJava之七——RxJava 2.0 图文分析create()、 subscribe()、map()、observeOn()、subscribeOn()源码 这张图可能是全网最详细 明了的图
Rxjava2.x 与1.x 的相关文章:
关于 RxJava 最友好的文章—— RxJava 2.0 全新来袭
官方文档:What’s different in 2.0
RxJava github
示例
Rxjava的使用流程,相信大家都很清楚了,以下面这个简单的demo,重点分析一下create()、 subscribe()、map()、observeOn()、subscribeOn()源码。 只要了解这些源码,再去看其他的类都会有似曾相识的感觉
Observable.create(object : ObservableOnSubscribe<Int>
override fun subscribe(emitter: ObservableEmitter<Int>)
emitter.onNext(1)
)
.map(object : Function<Int, String>
override fun apply(t: Int): String
return t.toString()
)
.subscribeOn(Schedulers.io())
.observeOn(androidSchedulers.mainThread())
.subscribe(object : Observer<String>
override fun onComplete()
override fun onSubscribe(d: Disposable)
override fun onNext(t: String)
override fun onError(e: Throwable)
)
在看源代码时,被其中一层一层的封装和调用,类名搞的晕晕的,一不留神就不知道谁是谁。
我把源代码的简单流程 ,画了个图,先脑子里有个整体的轮廓,再去看代码,会清晰很多
关于流程图的介绍:
这张图,可能是全网关于rxjava2 大体流程,最详细的图。为了排版和方便理解,简化了函数的关系,忽略了很多细节
-
绿色模块 表示订阅者,例如
Observable
或Flowable
,或者他们的子类 -
蓝色模块 表示观察者,例如
Observer
或Subscriber
,或者他们的子类 -
青色模块 表示数据发送,例如:
ObservableOnSubscribe
,ObservableSource
,等等 -
黄色模块 表示切换了线程
-
每个模块右上角表示当前类的名称
-
绿色的线 表示函数调用
-
蓝色的线 表示订阅
-
红色的线 表示数据发送
-
黑色的线 表示对象是什么或者对象从哪里来的
基本流程
还是以上面的demo为例,抛开操作符,只分析主要流程
先来看下,demo中都使用哪些类和接口:
- Observable 订阅者
- Observer 观察者
- ObservableOnSubscribe 数据发送源
public interface ObservableOnSubscribe<T>
// ObservableEmitter 也是一个接口,它继承接口Emitter
void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;
//这个接口,有常见的几种函数。在demo中可以看到,调用的oNext 就是这个接口的函数
public interface Emitter<T>
void oNext(@NonNull T value);
void onError(@NonNull Throwable error);
void onComplete();
下面一步一步进入源码:
create
public static <T> Observable<T> create(ObservableOnSubscribe<T> source)
// 判断非空
ObjectHelper.requireNonNull(source, "source is null");
//RxJavaPlugins 相当于是切面编程,会对所有的中间订阅者进行自定义修改,如果没有设置过。就直接当前参数
//创建一个 ObservableCreate,它就是上图中的绿色模块
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
来看一个RxJavaPlugins.onAssembly
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source)
// onObservableAssembly 需要手动设置,实现切面效果
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
//如果没有设置 就是null,直接返回参数source
if (f != null)
return apply(f, source);
return source;
create 传入的是ObservableOnSubscribe
类型参数,需要的Observable
返回类型,这里的ObservableCreate 继承了Observable
并实现接口subscribeActual
,引用了ObservableOnSubscribe
并调用它的接口subscribe
,所以它算是一种适配器模式。
public final class ObservableCreate<T> extends Observable<T>
// 每个Observable 的实现类,都有一个source,表示的是上游
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source)
this.source = source;
// subscribe 最终会调用到这里
// observer 是观察者,它里面实现了onNext、onSubscribe 等 这些函数
@Override
protected void subscribeActual(Observer<? super T> observer)
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
//调用了观察者的onSubscribe 方法
observer.onSubscribe(parent);
try
//这个上游source 就是demo中的ObservableOnSubscribe 实现,
//调用了subscribe,以本例来说,就会执行 emitter.onNext(1)
// 那么问题来了,subscribeActual 会在什么时候执行呢?
source.subscribe(parent);
catch (Throwable ex)
Exceptions.throwIfFatal(ex);
parent.onError(ex);
subscribe
需要注意,代码执行到这里,是谁调用了subscribe,create() 返回的是Observable
类型 ObservableCreate
,所以是ObservableCreate
调用了subscribe
,那么关于Observable
的接口,自然也会调用到ObservableCreate
public final void subscribe(Observer<? super T> observer)
ObjectHelper.requireNonNull(observer, "observer is null");
try
// 面向切面的设置,如果没有设置,直接返回observer
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
//以本例来看,就是执行ObservableCreate 中的subscribeActual
subscribeActual(observer);
catch (NullPointerException e) // NOPMD
throw e;
catch (Throwable e)
... 省略代码 ...
ObservableCreate 类
// subscribe 最终会调用到这里
// observer 是观察者,它里面实现了onNext、onSubscribe 等 这些函数
@Override
protected void subscribeActual(Observer<? super T> observer)
// 把观察者,封装成CreateEmitter
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
//调用了观察者的onSubscribe 方法
observer.onSubscribe(parent);
try
//这个上游source 就是demo中的ObservableOnSubscribe 实现,
//调用了subscribe,以本例来说,就会执行 emitter.onNext(1),也就是执行了parent.onNext(1)
source.subscribe(parent);
catch (Throwable ex)
Exceptions.throwIfFatal(ex);
parent.onError(ex);
最后进入CreateEmitter
来看一下:
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable
private static final long serialVersionUID = -3434801548987643227L;
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer)
//demo 中的observer 最终被传递到这里
this.observer = observer;
// 在subscribeActual 调用到这里
@Override
public void onNext(T t)
if (t == null)
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
if (!isDisposed())
//如果没有disposed ,就会执行demo中observer的onNext
observer.onNext(t);
... 省略代码 ...
@Override
public void onComplete()
//如果没有被dispose,会调用Observer的onComplete()方法
if (!isDisposed())
try
observer.onComplete();
finally
//执行完调用dispose
dispose();
... 省略代码 ...
源码看到这里,再去看上图,思路应该更清晰,但是应该对整图还不是很了解。
仔细总结一下会发现,在create后,创建了ObservableCreate ,他知道上游(source),知道被订阅后的处理方式(subscribeActual )也就是如何把数据发给下游,但是它需要等待调用subscribe,才会最终触发这个流程。
而且ObservableCreate 是继承于Observable ,对设计模式敏感的小伙伴,可能会想到装饰着模式,没错,所谓的操作符,无非就是用装饰着模式包裹一层,让他也知道上游(source),知道如何数据发给下游(实现subscribeActual ),最终subscribe 一调用,这个过程就被触发。
如果感觉混乱,没关系,下面跟着源码走一下,就会豁然开朗
map操作符
调用map,会执行下面的函数
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper)
ObjectHelper.requireNonNull(mapper, "mapper is null");
//发现了吗,上面的create 创建ObservableCreate , map 创建了 ObservableMap
//没错他们都是Observable的子类
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
有了新的Observable (ObservableMap),那肯定得有新的Observer,不然ObservableMap和谁关联呢?是的,新的Observer就是 MapObserver
//AbstractObservableWithUpstream 继承于Observable,有成员变量source
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U>
final Function<? super T, ? extends U> function;
//这里传入的是this,也就是本例中的ObservableCreate ,它继承于Observable 继承于ObservableSource
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function)
// 把上游的源保存起来
super(source);
//保存当前的操作函数,也就是demo 中map 里实现的函数
this.function = function;
// 太熟悉了,又是它,等ObservableMap 调用subscribe 的时候,会调用到这里
@Override
public void subscribeActual(Observer<? super U> t)
// 调用了上游(source)的subscribe,这就相当于触发了上游
// 传入的参数是MapObserver,联想到上面的subscribe 直接订阅Observer
// 这个MapObserver继承Observer,它的参数t 也是Observer,也就是把下游的Observer 包裹了一层,传递给上游。这正是装饰者模式
// 本例中emitter.onNext(1)执行后,也就是会执行MapObserver中的onNext
source.subscribe(new MapObserver<T, U>(t, function));
//BasicFuseableObserver 继承了Observer,有成员变量downstream、upstream 等
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U>
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper)
// 把下游的Observer 保存在downstream 中
super(actual);
// 保存map 变换操作
this.mapper = mapper;
// 本例中,onNext 是在emitter.onNext(1) 执行后,调用到这里的
@Override
public void onNext(T t)
if (done)
return;
if (sourceMode != NONE)
downstream.onNext(null);
return;
//目标类型
U v;
try
// 实现了变化操作,把t 转为 v
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
catch (Throwable ex)
fail(ex);
return;
//调用了下游onNext,继续分发数据,此时的数据是转换后的目标数据
downstream.onNext(v);
@Override
public int requestFusion(int mode)
return transitiveBoundaryFusion(mode);
@Nullable
@Override
public U poll() throws Exception
T t = qd.poll();
return t != null ? ObjectHelper.<U>requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;
此时再去看看上面的图,是不是有点感觉了,没错,全部都是这个套路,只是每次在装饰的时候,行为不一样
subscribeOn操作符
作用是控制subscribe
的线程,下面来看看subscribeOn
是如何实现的
不仔细看代码,还以为作者直接把实现map的代码拷贝了一份,简直太相似了
public final Observable<T> subscribeOn(Scheduler scheduler)
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
//使用原来的Observable 和调度线程,创建一个新的Observable,就是ObservableSubscribeOn
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
依旧是创建新的Observable(ObservableSubscribeOn) 和 Observer (SubscribeOnObserver)
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T>
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler)
// 依旧是保存了上游 ObservableSource
super(source);
// 保存了线程调度的Scheduler
this.scheduler = scheduler;
@Override
public void subscribeActual(final Observer<? super T> observer)
// 使用装饰着模式,把原来的Observer 封装了一层
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
//注意,这里还没有切换新城,调用了onSubscribe
observer.onSubscribe(parent);
//这里是重点,
//scheduler.scheduleDirect(new SubscribeTask(parent)) 在新线程中执行parent,
//parent.setDisposable 把新线程任务,加入到DisposableHelper,如果手动dispose后,保证线程可以停止
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable
private static final long serialVersionUID = 8094547886072529208L;
final Observer<? super T> downstream;
final AtomicReference<Disposable> upstream;
SubscribeOnObserver(Observer<? super T> downstream)
this.downstream = downstream;
this.upstream = new AtomicReference<Disposable>();
@Override
public void onSubscribe(Disposable d)
DisposableHelper.setOnce(this.upstream, d);
// 下面的常规操作方法,就是调用downstream 的各个操作方法
@Override
public void onNext(T t)
downstream.onNext(t);
@Override
public void onError(Throwable t)
downstream.onError(t);
@Override
public void onComplete()
downstream.onComplete();
@Override
public void dispose()
DisposableHelper.dispose(upstream);
DisposableHelper.dispose(this);
@Override
public boolean isDisposed()
return DisposableHelper.isDisposed(get());
void setDisposable(Disposable d)
//把当前
DisposableHelper.setOnce(this, d);
// 实现了线程的Runnable 接口,目的是让线程可以调度
final class SubscribeTask implements Runnable
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent)
this.parent = parent;
@Override
public void run()
// 会在新的线程中调用,source是上游的Observable
source.subscribe(parent);
截止到这里,subscribeOn总体的逻辑已经,搞清楚了,再深入一点看一下scheduler.scheduleDirect(new SubscribeTask(parent))
是如何实现线程切换的
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run)
//调用scheduleDirect,参数表示立即执行
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit)
//创建了一个worker,注意这里的createWorker(),是一个抽象方法,不同的线程,创建的worker 不一样。
// 例如:Schedulers.io() 创建的是 EventLoopWorker
final Worker w = createWorker();
// 依旧是切面编程,对每个切换线程的, 包裹一层
final Runnable decoratedRun = RxJavaPlugins.onScheduleRxJava入门之路