RxJava原理解析
Posted 码农小风
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RxJava原理解析相关的知识,希望对你有一定的参考价值。
一、RXjava介绍
首先看一下Rxjava这个名字,其中java代表java语言,而Rx是什么意思呢?Rx是Reactive Extensions的简写,翻译过来就是,响应式拓展。所以Rxjava的名字的含义就是,对java语言的拓展,让其可以实现对数据的响应式编程。
那么响应的是什么呢?响应的是上游数据的变化。常规用法是,对数据源进行监听,然后做出响应。
RxJava的整体结构是一条链,其中有这三个角色。
- 链的上游:生产者 Observable
- 链的下游:观察者 Observer
- 链的中间:各个中介节点,既是下游的Observable,又是上游的Observer
二、Rxjava基本使用
Single.just("hfhuaizhi").subscribe(object : SingleObserver<String>
override fun onSubscribe(d: Disposable)
Log.e(TAG, "onSubscribe")
override fun onSuccess(t: String)
Log.e(TAG, "onSuccess:$t")
override fun onError(e: Throwable)
Log.e(TAG, "onError:$e")
)
上面这段代码是对Rxjava简单的使用,其中
- Single 发出单个数据的被观察者Observable,只发送一次,只有Success和Error两种状态,没有next,在Rxjava2中新增
- just 被观察者生产的数据,参数类型是一个泛型,这里传进去的是一个String
- subscribe 观察者Observer,这里声明的是SingleObserver,用来对Single中产生的数据进行响应
- SingleObserver
- onSubscribe 订阅成功后就会回调,一般会在此方法中进行一些初始化操作。其参数类型是Disposable,可以通过调用d.dispose() 取消对Observable的监听,并让其停止发送消息。
- onSuccess 接收数据成功后就会回调,只会回调一次,其参数类型和Observable中just方法传入的数据类型一致,这里是String类型
- onError 发生错误时回调,参数是Throwable,包含错误信息。
运行效果
2021-12-18 13:54:12.450 29223-29223/com.hfhuaizhi.rxjavatest E/hftest: onSubscribe
2021-12-18 13:54:12.451 29223-29223/com.hfhuaizhi.rxjavatest E/hftest: onSuccess:hfhuaizhi
可以看到首先onSubscribe被调用,表明注册了观察者。然后接收数据成功,打印出’hfhuaizhi’。 到这里我们就了解了Rxjava最基本的用法,接下来分析一下函数的内部做了什么。
三、Rxjava原理解析
1. just方法分析
public static <@NonNull T> Single<T> just(T item)
Objects.requireNonNull(item, "item is null");
return RxJavaPlugins.onAssembly(new SingleJust<>(item));
-
对方法参数进行判空
-
调用
RxJavaPlugins.onAssembly
方法,其参数是一个SingleJust,构造方法传入了item
- 其中onAssembly方法内部对传入的参数进行一些处理,然后返回原参数类型,所以接下来分析的过程中会忽略此方法,可以简单认为just方法直接返回了一个SingleJust实例。
// onSingleAssembly 参数默认是空的,所以这个方法原样返回了source,当设置onSingleAssembly后,
// 会先对source进行处理后再返回
public static <@NonNull T> Single<T> onAssembly(@NonNull Single<T> source)
Function<? super Single, ? extends Single> f = onSingleAssembly;
if (f != null)
return apply(f, source);
return source;
final T value;
public SingleJust(T value)
this.value = value;
SingleJust将构造方法传入的item保存在value字段中。 由上述分析可知,Single.just
方法会返回一个SingleJust实例,所以在我们链式调用中的subscribe方法,实际上调用的是SingleJust的subscribe方法
public final void subscribe(@NonNull SingleObserver<? super T> observer)
// 1. 判空
Objects.requireNonNull(observer, "observer is null");
// 2. 对参数中的observer进行处理后又返回observer
observer = RxJavaPlugins.onSubscribe(this, observer);
// 3. 对Observer进行判空
Objects.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null SingleObserver. Please check the handler provided to RxJavaPlugins.setOnSingleSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
try
// 4. 调用真实注册方法
subscribeActual(observer);
catch (NullPointerException ex)
throw ex;
catch (Throwable ex)
Exceptions.throwIfFatal(ex);
NullPointerException npe = new NullPointerException("subscribeActual failed");
npe.initCause(ex);
throw npe;
subscrib方法中主要做了注释中所写的四步操作,其中重要的是第4步subscribeActual
,这里才是真正做事的,之前都是数据的校验,因为我们这个类的实例是SingleJust,所以接下来看一下SingleJust的subscribeActual方法做了什么。
@Override
protected void subscribeActual(SingleObserver<? super T> observer)
observer.onSubscribe(Disposable.disposed());
observer.onSuccess(value);
可以看到内容十分简单
- 调用observer的onSubscribe方法,表明订阅成功,参数是Disposable.disposed()返回值
- 调用observer的onSuccess方法,表明数据回调成功,参数是value,而value就是通过Single的just函数传进来的,通过构造方法传入SingleJust实例中,因此,这一步的操作就是简单地将构造方法中传入的值,通过observer的onSuccess方法回调给我们定义的观察者SingleObserver。
这样就完事了,因为之前说过Single.just是最简单的RxJava使用方式,先调用onSubscribe表明注册监听,然后又紧接着通过onSuccess回调数据,所以不会有失败的情况。
2. map方法分析
map是Rxjava中比较常用的用法,用来实现数据类型的转换 比如像这样,我们发送的数据类型是Integer,接收的数据类型是String,这样当然是无法直接接收的,所以需要进行一下转换,将上游数据发送的Integer转换为String,然后由下游接收。
private fun testMap(view: View)
Single.just(123).map(object : Function<Int, String>
override fun apply(t: Int): String
return "$t"
).subscribe(object : SingleObserver<String>
override fun onSubscribe(d: Disposable)
Log.e(TAG, "onSubscribe")
override fun onSuccess(t: String)
Log.e(TAG, "onSuccess:$t")
override fun onError(e: Throwable)
Log.e(TAG, "onError:$e")
)
打印结果
021-12-18 22:32:02.958 5210-5210/com.hfhuaizhi.rxjavatest E/hftest: onSubscribe
2021-12-18 22:32:02.958 5210-5210/com.hfhuaizhi.rxjavatest E/hftest: onSuccess:123
just方法传入的123是Integer类型,onSuccess处接收的数据是String类型,通过map进行转换。其中map方法传入的参数是一个Function<T,E>,此类有两个泛型参数,T代表输入数据类型,E表示输出数据类型,这里的输入数据类型是Integer,返回类型是String,apply方法中返回了String类型的输出数据。
map(object : Function<Int, String>
override fun apply(t: Int): String
return "$t"
)
public final <@NonNull R> Single<R> map(@NonNull Function<? super T, ? extends R> mapper)
Objects.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new SingleMap<>(this, mapper));
进入map方法内部,此方法判空后,返回了SingleMap实例,其构造方法传入了当前SingleJust实例和mapper转换参数,并将其分别保存在source和mapper成员变量中。
public final class SingleMap<T, R> extends Single<R>
final SingleSource<? extends T> source;
final Function<? super T, ? extends R> mapper;
public SingleMap(SingleSource<? extends T> source, Function<? super T, ? extends R> mapper)
this.source = source;
this.mapper = mapper;
...
好,map方法暂且看到这儿,我们接下来继续分析链式调用中的subscribe方法。
subccribe传入了一个SingleObserver,和之前分析的类似,但是区别在于调用的不再是SingleJust的subscribe方法,而是map方法返回的SingleMap的subscribe方法,由之前的分析可知,此方法调用会在数据的判空后调用到SingleMap的subscribeActual
方法。 由之前的分析可知,链式调用到subscribe方法会调用到SingleMap的subscribeActual
方法
public final class SingleMap<T, R> extends Single<R>
final SingleSource<? extends T> source;
final Function<? super T, ? extends R> mapper;
public SingleMap(SingleSource<? extends T> source, Function<? super T, ? extends R> mapper)
this.source = source;
this.mapper = mapper;
@Override
protected void subscribeActual(final SingleObserver<? super R> t)
source.subscribe(new MapSingleObserver<T, R>(t, mapper));
...
由之前的分析可知,source就是map的上游SingleJust, 所以在single的实际subscribe方法中会调用其上游的subscribe方法,并传入了一个封装好的新的MapSingleObserver,MapSingleObserver的构造方法中第一个参数t,是下游观察者,在我们这块代码中就是链式调用的时候传入的SingleObserver。第二个参数是我们在map方法中传入的数据类型转换转换器mapper。 由之前的分析可知,当source,也就是SingleJust的subscribe方法调用后,会依次调用其参数传入的Observer的onSubscribe方法和onSuccess方法,此时参数传入的Observer就是上面代码块里的MapSingleObserver
static final class MapSingleObserver<T, R> implements SingleObserver<T>
final SingleObserver<? super R> t;
final Function<? super T, ? extends R> mapper;
MapSingleObserver(SingleObserver<? super R> t, Function<? super T, ? extends R> mapper)
this.t = t;
this.mapper = mapper;
@Override
public void onSubscribe(Disposable d)
t.onSubscribe(d);
@Override
public void onSuccess(T value)
R v;
try
v = Objects.requireNonNull(mapper.apply(value), "The mapper function returned a null value.");
catch (Throwable e)
Exceptions.throwIfFatal(e);
onError(e);
return;
t.onSuccess(v);
@Override
public void onError(Throwable e)
t.onError(e);
onSubscribe方法原封不动的调用了t.onSubscribe(d);而t就是在MapSingleObserver构造方法传入的下游观察者,也就是SingleObserver实例。这里直接调用了其onSubscribe方法表示注册监听成功。 onSuccess方法中调用了mapper.apply(value),这个mapper就是我们在map方法中传入的转换函数,这里输入了Integer数据类型,得到了String类型输出,最后调用t.onSuccess回调转换后的数据,也就是调用我们subscribe方法传入的实例的onSuccess。
map方法总结
map主要做的就是一个承上启下,链式调用中subscribe方法调用后,会依次向上调用中间节点的subscribe方法,直到调用到最初始的没有上游的Observable,最上层的Observable会在其subscribeActual方法中调用其下游观察者的onSubscribe和onSuccess/onError,将数据一层一层传下去,数据传递的过程中,中间节点可能会对数据进行处理后再接着向下传,最终传递到最底层的Observer,整个流程如图所示
图片含义解释
最上游的Single就是我们调用Single.just产生的SingleJust,其subscribe方法中会调用onSubscribe()和onSuccess(),向下方观察者传递Integer类型的结果,中间观察者SingleObserver由map方法创建,其接收到上游传递下来的数据后,将其转换为String,然后传递给下方观察者,最后下游收到的数据结果就是String类型。
3. 线程切换
线程切换可以说是RxJava中最常用的操作了,甚至很多人选择RxJava,就是因为RxJava可以和方便地实现线程切换。 线程切换主要用到这两个函数:
- subscribeOn
- observerOn
private fun testSubscribe(view: View)
Single.just("hfhuaizhi").subscribeOn(Schedulers.io())
.observeOn(androidSchedulers.mainThread()).subscribe(object : SingleObserver<String>
override fun onSubscribe(d: Disposable)
Log.e(TAG, "onSubscribe")
override fun onSuccess(t: String)
Log.e(TAG, "onSuccess:$t")
override fun onError(e: Throwable)
Log.e(TAG, "onError:$e")
)
这样写,可以实现subscribe调用之前的消息发送在io线程,observerOn调用之后的Observer回调在android主线程,其中AndroidSchedulers类不在Rxjava标准库中,需要额外引入RxAndroid依赖。
subscribeOn
public final Single<T> subscribeOn(@NonNull Scheduler scheduler)
Objects.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new SingleSubscribeOn<>(this, scheduler));
subscribeOn方法返回一个SingleSubscribeOn实例,其构造方法中传入了this(上游被观察者)和scheduler(线程调度器,我们传入的是Schedulers.io())。 由之前的分析可知,链式调用中最终subscribe方法调用的时候,会由下向上依次调用各个节点的subscribe方法,这里我们看一下SingleSubscribeOn这一线程切换的节点的subscribe方法做了什么,因为SingleSubscribeOn和SingleJust一样继承自Single,其subscribe方法也是调用到了subscribeActual方法
public final class SingleSubscribeOn<T> extends Single<T>
final SingleSource<? extends T> source;
final Scheduler scheduler;
public SingleSubscribeOn(SingleSource<? extends T> source, Scheduler scheduler)
// 上层被观察对象
this.source = source;
// 线程类型
this.scheduler = scheduler;
@Override
protected void subscribeActual(final SingleObserver<? super T> observer)
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer, source);
observer.onSubscribe(parent);
Disposable f = scheduler.scheduleDirect(parent);
parent.task.replace(f);
-
将observer(下游观察者)和source(上游被观察者)封装进一个新的观察者SubscribeOnObserver
-
调用下游观察者的onSubscribe方法
-
调用scheduler的scheduleDirect方法,参数传入刚封装的新的观察者SubscribeOnObserver实例
-
将parent的task变量替换为由传入的scheduler生成的Disposable
final SequentialDisposable task;
- 这个task的参数类型是Disposable,之前有提到过,在Observer的onSubscribe方法中会传入一个Disposable,调用Disposable的dispose()方法后,会取消注册并让上游停止发送任务,这个Disposable继承自AtomicReference 实现了Disposable接口,AtomicReference是java里的原子引用类型,可以线程安全地对对象引用进行修改,类似地还有AtomicInteger等,所以这里的parent.task.replace(f)就是将parent中的task这个disposable线程安全地替换为scheduler创建地这个新的Disposable,从而可以实现任务的取消。
static final class SubscribeOnObserver<T>
extends AtomicReference<Disposable>
implements SingleObserver<T>, Disposable, Runnable
...
接下来分析一下第3步主要做了什么
public Disposable scheduleDirect(@NonNull Runnable run)
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
scheduleDirect方法中传入了一个Runnable类型参数,因为SubscribeOnObserver类实现了Runnable接口,所以可以被当作Runnable传进去。
因为我们传入的scheduler参数是由Schedulers.io()方法创建的,而此方法默认会返回一个ioscheduler
这个Scheduler的注释写着,会创建并缓存一个线程池。所以我们知道了scheduleDirect方法会将传入的Runnable放入一个线程池里执行,从而实现任务的异步执行,所以接下来我们去看一下SubscribeOnObserver的run方法里做了什么。
static final class SubscribeOnObserver<T>
extends AtomicReference<Disposable>
implements SingleObserver<T>, Disposable, Runnable
private static final long serialVersionUID = 7000911171163930287L;
final SingleObserver<? super T> downstream;
final SequentialDisposable task;
final SingleSource<? extends T> source;
SubscribeOnObserver(SingleObserver<? super T> actual, SingleSource<? extends T> source)
this.downstream = actual;
this.source = source;
this.task = new SequentialDisposable();
@Override
public void run()
source.subscribe(this);
@Override
public void onSubscribe(Disposable d)
DisposableHelper.setOnce(this, d);
@Override
public void onSuccess(T value)
downstream.onSuccess(value);
...
SubscribeOnObserver的run方法中会调用source.subscribe,并传入自己(自己也是一个Observer),由之前分析我们知道source就是我们监听的上游,这里调用了SingleJust的subscribe,由之前的分析我们知道subscribe会调用到subscribeActual
,这里做任务的真正执行,因此就这样实现了让上游任务在异步线程中的执行,上游任务执行过后,会将数据向下传递,传递到当前SubscribeOnObserver节点的时候会调用其onSuccess方法,其调用downstream,也就是下游观察者的onSuccess方法,将数据继续向下传递,此时数据传递的线程也是run方法执行的线程,因为此时并没有再次对线程进行切换。
observerOn
public final Single<T> observeOn(@NonNull Scheduler scheduler)
Objects.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new SingleObserveOn<>(this, scheduler));
observeOn函数返回了一个SingleObserveOn,也是需要传入this(上游被观察者),和scheduler(线程调度器类型,此时我们传入的是AndroidSchedulers.mainThread()),由之前分析可知我们此时应该去看SingleObserveOn的subscribeActual方法调用
protected void subscribeActual(final SingleObserver<? super T> observer)
source.subscribe(new ObserveOnSingleObserver<>(observer, scheduler));
此方法中调用了其上游的subscribe方法,和之前分析的数据流转过程一致,需要依次调用到最根节点的subscribe,参数传入的是封装后的观察者ObserveOnSingleObserver,其构造方法中传入了下游观察者和线程调度类型,接下来我们看一下当ObserveOnSingleObserver收到上游传下来的数据后进行了怎样的操作。
static final class ObserveOnSingleObserver<T> extends AtomicReference<Disposable>
implements SingleObserver<T>, Disposable, Runnable
private static final long serialVersionUID = 3528003840217436037L;
final SingleObserver<? super T> downstream;
final Scheduler scheduler;
T value;
Throwable error;
ObserveOnSingleObserver(SingleObserver<? super T> actual, Scheduler scheduler)
this.downstream = actual;
this.scheduler = scheduler;
@Override
public void onSubscribe(Disposable d)
if (DisposableHelper.setOnce(this, d))
downstream.onSubscribe(this);
@Override
public void onSuccess(T value)
this.value = value;
Disposable d = scheduler.scheduleDirect(this);
DisposableHelper.replace(this, d);
@Override
public void run()
Throwable ex = error;
if (ex != null)
downstream.onError(ex);
else
downstream.onSuccess(value);
...
可以看到在onSuccess方法中调用了scheduler.scheduleDirect(this),并穿了个this,而且自身实现了runnable接口,由之前分析可知,run方法会在某一时刻被调用。传入的scheduler是AndroidSchedulers.mainThread()
其返回的是HandlerScheduler
,其内部封装了个Handler,将Runnable 弄到主线程去执行。最终结果就是ObserveOnSingleObserver的run方法在主线程中被调用, 其run方法调用了下游观察者downstream的onSuccess/onError。 由此分析可知,observerOn方法控制此节点后的被观察者收到数据时所在的线程,无法影响其上游节点。
以上是关于RxJava原理解析的主要内容,如果未能解决你的问题,请参考以下文章
浅析RxJava 1.x&2.x版本区别及原理:maplift操作符源码解析