浅析RxJava 1.x&2.x版本区别及原理:1.x 版本 Scheduler调度线程变换及subscribeOnobserveOn方法源码解析
Posted 鸽一门
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了浅析RxJava 1.x&2.x版本区别及原理:1.x 版本 Scheduler调度线程变换及subscribeOnobserveOn方法源码解析相关的知识,希望对你有一定的参考价值。
上两篇文章都是对比分析RxJava中较基本的订阅流程与操作,即Observable、Flowable等基本元素的源码,还有map、lift操作符的源码。在对Rxjava框架有了一个坚实的基础后,此篇文章将直袭Rxjava中最闪亮的Point,也是android猿平常在开发中经常遇到的需求 —— 线程切换,主线程中需要进行耗时操作,要求切换到子线程,子线程中需要进行UI更新,要切切换到主线程。以上切换原理如何?此篇将以揭晓。
(此系列文章重点在于分析源码,并不适合作为框架的初学篇,笔者建议熟悉其用法后再观此博客讨论学习。另此系列文章源码分析难度由浅至深,建议先学习理解之前的文章:
浅析RxJava 1.x&2.x版本使用区别及原理(一):Observable、Flowable等基本元素源码解析
浅析RxJava 1.x&2.x版本区别及原理(二):map、lift操作符源码解析
)
文章将首先分析线程调度者Scheduler的真正面目,了解线程切换的原理所在。再从subscribeOn
、observeOn
方法入手,知晓线程调度之间方法逻辑,结合此两点便可了解Rxjava框架的线程切换秘密。本篇涉及到的知识点有:
- 线程变换的基本概念及1.x 版本线程变换使用
- Scheduler源码及原理分析(主线程、子线程变换)
- subscribeOn、observeOn方法源码分析
一. RxJava 1.x Scheduler线程变换
首先介绍一下线程变换的基本概念:
- 让代码可以在不同的线程执行
- subscribeOn ——订阅时的线程
- observeOn —— 接收时的线程
- Scheduler —— 实际做线程变换
1. Scheduler线程变换使用
Observable.create(new Observable.OnSubscribe<String>()
@Override
public void call(Subscriber<? super String> subscriber)
if (!subscriber.isUnsubscribed())
System.out.println("currentThread:" + Thread.currentThread());
subscriber.onNext("test");
subscriber.onCompleted();
).
subscribeOn(Schedulers.newThread()).
observeOn(AndroidSchedulers.mainThread()).
subscribe(new Observer<String>()
@Override
public void onCompleted()
@Override
public void onError(Throwable e)
@Override
public void onNext(String s)
System.out.println("onNext:" + s + "currentThread:" + Thread.currentThread());
);
简单说明代码使用,除了一个常用的Observable创建create
、订阅subscribe
方法调用及实现外,此处在创建完Observable对象后运用了线程变换方法,即subscribeOn(Schedulers.newThread())
,指定订阅时的线程为子线程(可做一些耗时操作),observeOn(AndroidSchedulers.mainThread())
指定接收时的线程为主线程(可做一些UI变化)。
有了以上基本了解后,这里先提前列出RxJava 1.x版本中线程变换原理的重要元素:
- Scheduler调度者
- Operator操作符接口
- lift核心操作符
以上重点就在于Scheduler抽象类,它才是线程切换的调度者,涉及到的重要元素有:
- Scheduler抽象类
- Worker —– 真正做线程调度的类
- Action0 —— 在线程中执行的操作
- schedule —— 实际做线程调度的方法
2. Scheduler源码分析
- Scheduler调度者
找到Scheduler抽象类,查看其源码组成,如下:
可知Scheduler确实是一个抽象类,内有一个抽象方法createWorker()
,方法参数是Worker。主要是还有一个抽象内部类Worker,实现了Subscription接口,内部有不同参数重载的两个schedule
方法,方法中第一个参数的类型都是Action0。对比以上列举的重要元素,全部集齐!
同时也可以看出,虽然Scheduler抽象类是线程切换的调度者,但其内部只有一个抽象方法,因此幕后BOSS还是内部抽象类Worker,其内部的schedule
方法作用就是进行线程调度,参数Action0就是即将被调度的线程操作。
- 原理归纳
下面回到我们调用的线程方法源码上来,在分析之前还是抛出一些原理步骤归纳:
- 传入不同Scheduler来调度不同的线程
- 用Scheduler创建Worker来使用真正的线程池
- 传入具体操作Action0
- 通过
schedule
方法来实现调度
首先第一个步骤我们在调用subscribeOn(Schedulers.newThread())
、observeOn(AndroidSchedulers.mainThread())
线程切换的方法时已经使用到了,接着深入Schedulers源码,查看其第二个步骤具体实现。
注意:这里不要将Scheduler和Schedulers弄混淆,前者是线程调度者,而后者Schedulers是一个静态类,为了提供创建Scheduler对象方法的工厂模式。
- Schedulers
首先映入眼帘的就是Schedulers类三个Scheduler类型成员变量,从其变量名可知分别运用于不同的场景:计算调度、io读写调度、新线程调度,结合之前对Schedulers介绍,它内部采用了工厂模式来创建不同类型的Scheduler,供开发者使用。
我们使用的是第三个newThreadScheduler,查看红框的创建源码可知,第一次调用是nt变量为null,由RxJavaSchedulersHook的createNewThreadScheduler()
方法创建newThreadScheduler,继而查看其方法实现:
可见此方法内部调用了另外一个参数重载的createNewThreadScheduler
方法,创建了一个RxThreadFactory对象实例作为参数传入。细心的你会发现创建RxThreadFactory时传入了“RxNewThreadScheduler-”字符串,而在一开始展示的例子输出订阅时所在的线程Log中也包含了此字符串。
继续查看下面重载的方法,它创建了一个NewThreadScheduler实例,并将方法参数RxThreadFactory传入其构造方法中,最后返回。重点就是NewThreadScheduler类,查看其详细实现:
- NewThreadScheduler
如上,之前在介绍Scheduler抽象类时,内部一个创建Worker的抽象方法createWorker
,现在找到了其继承类——NewThreadScheduler,重点查看它实现的createWorker
方法,创建了一个NewThreadWorker对象返回,将成员变量NewThreadWorker传给构造方法。再来查看NewThreadWorker源码:
- NewThreadWorker
NewThreadWorker类中代码太多,这里先讲解重点部分。如上图,此类必然是继承于Worker抽象类,同时又实现了Subscription接口。注意其成员变量红框部分,既然Worker的继承类是真正实现线程调度的BOSS,其过程必定会有线程池的参与,其成员变量EXECUTORS正好印证,后续再讲。NewThreadWorker既然继承于抽象类Worker,必定实现了重要抽象方法schedule
方法,查看其源码:
schedule
方法内部最终调用的还是重载的多参数方法,查看多参数的schedule
方法,发现其内部又调用了subscribeActual
方法,并传入具体操作Action0参数,其方法名暗示也很明显了,来查看这个“真正调度”方法的实现:
subscribeActual
方法的后两个参数指定延长时间,重点放到具体操作Action0参数。首先第一行代码RxJavaHooks.onScheduledAction(action)
就是返回参数action,赋值给成员变量decoratedAction;下一步又使用ScheduledAction对Action0进行了一次包装,返回ScheduledAction类型变量run,后面直接将这个变量run传入executor线程池去运行了。
以上是这个重点方法的大致逻辑,再来详细查看细节部分,我们似乎还不太清楚ScheduledAction类型变量run的真面目,但说到executor线程池直接去运行run了,这结果已经不言而喻了。笔者抢答:线程池运行的肯定是线程啊,莫非ScheduledAction继承于Thread?好像离真相越来越近了,先来查看ScheduledAction的真面目:
果不其然,不过ScheduledAction是实现的Runnable接口,其run
方法中调用了Action0接口的call
方法。有了简单了解后再回到subscribeActual
方法,可得知线程池运行这个Runnable,最终调用其run
方法,但真正核心是调用Action0接口的call
方法,它也是线程调度的原理。
- 小结
至此,再回到源码分析中一开始列举的原理四步骤:
- 第一步已在我们调用线程调度方法时实现了,即传入不同Scheduler来调度不同的线程。
- 深入Scheduler发现其内部只有一个
createWorker
方法和内部抽象类Worker,Scheduler调度者的幕后是创建创建Worker来使用真正的线程池,即第二步总结。 - Worker抽象类内部有
schedule
调度方法,其参数Action0就是具体的操作,即第三步总结。 - 最终Worker的子类实现了
schedule
调度方法,内部就是通过线程池运行Runnable,其run
方法核心就是调用Action0接口的call
方法,来实现线程调度!
3. Android中的Scheduler
我们在代码中通过subscribeOn(Schedulers.newThread())
来指定订阅时的子线程,以上已介绍完Schedulers。又通过observeOn(AndroidSchedulers.mainThread())
指定接收数据时的主线程,来查看AndroidSchedulers的详细实现。
还是先预告其原理核心:通过Handler和Looper来实现执行在主线程。
AndroidSchedulers不同于Schedulers内部提供了3个Scheduler调度者(计算、IO、新线程)选择,此处只有一个主线程mainThreadScheduler。我们是通过调用此类的mainThread()
方法获取Scheduler返回,查看其详细实现,最终创建Scheduler的地方是其构造方法红框处!
真相大白,这里创建了一个LooperScheduler对象实例,并将主线程的Looper传入到构造方法中。笔者看到Looper真是觉得倍感亲切,之前一篇文章Android 消息机制:Handler、Looper、Message源码 详细版解析 ——从入门到升天分析过,Android的主线程就是通过Handler、Looper、Message来处理消息,因此Looper都出现了,Handler还会远吗~来查看LooperScheduler实现:
- LooperScheduler
果不其然,LooperScheduler的内部唯一成员变量就是Handler,通过创建LooperScheduler时传入的主线程Looper,生成Handler。且LooperScheduler继承了Scheduler,查看重点实现创建Worker的createWorker()
方法,创建了一个继承于Worker的HandlerWorker类,并传入Handler至其构造方法。来查看HandlerWorker类实现:
- HandlerWorker
(注意:Looper、Handler都已出现,期待已久的Message即将登场~)
首先HandlerWorker类构造方法为成员变量Handler赋值,再看最终重载的schedule
方法,重点在于红框部分。笔者露出了会心一笑,这些Handler发送Message部分相信Android猿们应当再熟悉不过了。
- 首先创建包装了Action0和handler的ScheduledAction,之前已经介绍过它实现的Runnable接口,其run方法中调用了Action0接口的call方法。
- 接着封装了包含ScheduledAction的一个Message对象。
- 通过主线程的Handler发送该Message,发送到的线程就是MainLooper主线程。
- 过程结束,当MainLooper主线程收到该消息后,会执行其
run
方法,其中包含着线程调度的核心,也就是Action0的call
方法最终被执行。
3. Schedulers和AndroidSchedulers小结
- 方法:
- Schedulers: 指定订阅时的子线程
subscribeOn(Schedulers.newThread())
中的Schedulers - AndroidSchedulers:指定接收数据时的主线程
observeOn(AndroidSchedulers.mainThread())
的AndroidSchedulers
- Schedulers: 指定订阅时的子线程
- 线程切换机制:
- Schedulers: Java机制中的线程池执行封装过Action0接口的Runnable进行线程调度,关键处是Action0接口的
call
方法。 - AndroidSchedulers: Android中的消息机制,通过获取主线程的Looper,再创建与之对应的handler,将Action0接口封装到Message中,使用主线程的handler发送。当主线程的Looper收到消息后最终执行Action0接口的
call
方法,完成线程调度。
- Schedulers: Java机制中的线程池执行封装过Action0接口的Runnable进行线程调度,关键处是Action0接口的
二. RxJava 1.x subscribeOn、observeOn原理分析
1 subscribeOn原理
前面部分讲解都是subscribeOn(Schedulers.newThread())
参数部分,理解了大致的线程变换机制和Action0接口的重要性,在此回到一开始展示的基本使用代码,通过subscribeOn
方法指定订阅时的线程,内部具体原理如何?
此节将深入分析subscribeOn
方法源码,在此之前还是先列举出其原理步骤:
- 通过OnSubscribe接口来处理
- 利用Scheduler将发出动作Action0放到线程中执行
如上subscribeOn
源码,最终返回了一个新的Observable对象,而方法内部调用了Observable的create
方法来创建实例,此方法需要传入OnSubscribe接口参数,而源码中是创建了一个OperatorSubscribeOn实例,并传入旧的Observable实例和Scheduler实例。相必OperatorSubscribeOn实现了OnSubscribe接口,查看其实现:
果然如此!当我们再调用subscribeOn(Schedulers.newThread())
取指定订阅时的子线程,调用之后会重新返回一个新的Observable,并创建一个新的OnSubscribe接口。这也就意味着基本元素订阅原理的最后一步,即调用OnSubscribe接口的call
方法并传入Subscriber时,此OnSubscribe接口是新的OnSubscribe接口,而并非是我们调用create
方法创建Observable传入的OnSubscribe接口。
继续查看其重点方法call
,调用scheduler.createWorker()
创建了一个线程调度对象Worker,调用schedule
方法进行线程调度,意味着schedule
方法中的参数——实现的Action0接口中的call
逻辑在Worker调度的线程中执行! 而一开始我们调用subscribeOn(Schedulers.newThread())
传入的是newThread,因此Action0接口中的call
逻辑在newThread子线程中执行!
继续查看其重点方法call
的参数Subscriber类型,而在newThread子线程中执行的Action0接口中的call
方法,对Subscriber重新进行了一次简单封装在实现对应的onNext
、onComplete
方法内部调用的还是的旧Subscriber实例,最后一行则是调用Observable的subscribe
订阅方法,传入新的Subscriber实例。这也就意味着我们调用create
方法创建Observable传入的OnSubscribe接口中call
方法内对Subscribe的逻辑操作,例如调用Subscribe的onNext
方法,这些发起的操作被执行的线程是子线程。
以上方法印证了先前列举的原理步骤中的第二步:OperatorSubscribeOn类中的call
方法内部利用Scheduler将发出动作Action0放到线程中执行。
最后再总结一下OperatorSubscribeOn类中的call
方法内部逻辑:
- 获取调度者Worker
- 调用Worker的调度方法
schedule
,由于之前传入的参数是newThread,因此Action0接口中的所有逻辑操作都是在子线程newThread中执行。 - Action0接口中的
call
方法重新封装了Subscribe类并传入,这也就意味着调用Subscribe的onNext
等方法这些发起操作时,执行的线程实在子线程中!这也是为何可以将发出的动作被执行于子线程的核心所在。
2. observeOn原理
此节将深入分析observeOn方法源码,在此之前还是先列举出其原理步骤:
- 采用lift操作符(代理模式)
- 通过Subscriber来处理
- 利用Scheduler将发出动作Action0放到线程中执行
此重载方法最终调用的是如上,内部采用了lift操作符,真相已然明确一半,在上一篇文章中分析过:lift操作符创建了并返回了一个类似于代理的Observable,来接收原Observable发起的数据,然后在Operator中重新包装了一个新的Subscriber实例返回,此实例中预先对数据做一些处理后传递并调用原Subscriber的onNext
等方法。
此处的observeOn亦同理,创建了一个代理的Observable,并创建一个代理观察者接受上一级Observable的数据,代理观察者收到数据之后会开启一个线程,在新的线程中,调用下一级观察者的
onNext
、onCompete
、onError
方法。
来查看observeOn操作符的源码:
如上图红框所示,其逻辑与上一篇文章中lift操作符原理完全类似!只是此处多了一个逻辑处理,即对线程切换的处理。继续查看封装后的Subscriber实现,是如何处理线程切换,ObserveOnSubscriber源码如下:
//代理观察者
private static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0
final Subscriber<? super T> child;
final Scheduler.Worker recursiveScheduler;
final NotificationLite<T> on;
final Queue<Object> queue;
//接收上一级Observable发出的数据
@Override
public void onNext(final T t)
if (isUnsubscribed() || finished)
return;
if (!queue.offer(on.next(t)))
onError(new MissingBackpressureException());
return;
schedule();
......
//线程切换处理数据⭐️⭐️⭐️⭐️⭐️
protected void schedule()
if (counter.getAndIncrement() == 0)
recursiveScheduler.schedule(this);//调用Worker调度者的schedule方法切换线程!
//在新的线程中将数据发送给目标观察者
@Override
public void call()
long missed = 1L;
long currentEmission = emitted;
final Queue<Object> q = this.queue;
final Subscriber<? super T> localChild = this.child;
final NotificationLite<T> localOn = this.on;
for (;;)
while (requestAmount != currentEmission)
...
localChild.onNext(localOn.getValue(v));
如上,代码中的注释已做简单分析,observeOn
方法处理线程调度的主要逻辑与subscribeOn
相同,都是依赖Worker类的schedule方法切换线程,根据不同实现类型的Woker(例如子线程NewThreadWorker、主线程HandlerWorker)切换到与之对应的线程,只不过封装的调用线程切换方法的对象不同(前者是subscribeOn接口,后者是observeOn类)。
3. subscribeOn与observeOn原理小结
- 调度的线程:
subscribeOn
方法:创建Observable时实现的call
方法中调用Subscriber方法,这些发起的动作执行于调用subscribeOn
方法时传入的指定线程(如子线程newThread、ioThread、computationThread)。observeOn
方法: 调用subscribe
传入的observer接口中onNext
、onComplete
、onError
的逻辑处理,被执行于调用observeOn
方法时传入的指定线程(如Android主线程)。
- 处理线程切换而包装的实例:
subscribeOn
方法: 是包装原OnSubscribe过后的OperatorSubscribeOn,在继承的call
方法中调用Worker的scheduler
方法来切换线程。observeOn
方法: 是包装原Subscribe过后的ObserveOnSubscriber,在实现的onNext
、onComplete
、onError
中调用Worker的scheduler
方法来切换线程。
- 调度者:
subscribeOn
方法: Schedulers(例如NewThreadScheduler中的NewThreadWorker)observeOn
方法: LooperScheduler中的HandlerWorker
前两篇文章都是先讲解Rxjava 1.x版本的源码分析,再讲解2.x 版本这样对比着来的。介于此部分篇幅较长,将其拆分成两篇文章,下一篇分析2.x 版本线程变换机制。
若有错误,虚心指教~
以上是关于浅析RxJava 1.x&2.x版本区别及原理:1.x 版本 Scheduler调度线程变换及subscribeOnobserveOn方法源码解析的主要内容,如果未能解决你的问题,请参考以下文章
浅析RxJava 1.x&2.x版本使用区别及原理:ObservableFlowable等基本元素源码解析
浅析RxJava 1.x&2.x版本区别及原理:1.x 版本 Scheduler调度线程变换及subscribeOnobserveOn方法源码解析
浅析RxJava 1.x&2.x版本区别及原理:1.x 版本 Scheduler调度线程变换及subscribeOnobserveOn方法源码解析
浅析RxJava 1.x&2.x版本区别及原理:1.x 版本 Scheduler调度线程变换及subscribeOnobserveOn方法源码解析