浅析RxJava 1.x&2.x版本区别及原理:1.x 版本 Scheduler调度线程变换及subscribeOnobserveOn方法源码解析

Posted 鸽一门

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了浅析RxJava 1.x&2.x版本区别及原理:1.x 版本 Scheduler调度线程变换及subscribeOnobserveOn方法源码解析相关的知识,希望对你有一定的参考价值。

上两篇文章都是对比分析RxJava中较基本的订阅流程与操作,即Observable、Flowable等基本元素的源码,还有map、lift操作符的源码。在对Rxjava框架有了一个坚实的基础后,此篇文章将直袭Rxjava中最闪亮的Point,也是android猿平常在开发中经常遇到的需求 —— 线程切换,主线程中需要进行耗时操作,要求切换到子线程,子线程中需要进行UI更新,要切切换到主线程。以上切换原理如何?此篇将以揭晓。

(此系列文章重点在于分析源码,并不适合作为框架的初学篇,笔者建议熟悉其用法后再观此博客讨论学习。另此系列文章源码分析难度由浅至深,建议先学习理解之前的文章:
浅析RxJava 1.x&2.x版本使用区别及原理(一):Observable、Flowable等基本元素源码解析
浅析RxJava 1.x&2.x版本区别及原理(二):map、lift操作符源码解析

文章将首先分析线程调度者Scheduler的真正面目,了解线程切换的原理所在。再从subscribeOnobserveOn方法入手,知晓线程调度之间方法逻辑,结合此两点便可了解Rxjava框架的线程切换秘密。本篇涉及到的知识点有:

  • 线程变换的基本概念及1.x 版本线程变换使用
  • Scheduler源码及原理分析(主线程、子线程变换)
  • subscribeOn、observeOn方法源码分析

一. RxJava 1.x Scheduler线程变换

首先介绍一下线程变换的基本概念:

  • 让代码可以在不同的线程执行
  • subscribeOn ——订阅时的线程
  • observeOn —— 接收时的线程
  • Scheduler —— 实际做线程变换

1. Scheduler线程变换使用

Observable.create(new Observable.OnSubscribe<String>() 
                            @Override
                            public void call(Subscriber<? super String> subscriber) 
                                if (!subscriber.isUnsubscribed()) 
                                    System.out.println("currentThread:" + Thread.currentThread());
                                    subscriber.onNext("test");
                                    subscriber.onCompleted();
                                
                            
                        ).
                        subscribeOn(Schedulers.newThread()).
                        observeOn(AndroidSchedulers.mainThread()).
                        subscribe(new Observer<String>() 
                            @Override
                            public void onCompleted() 

                            

                            @Override
                            public void onError(Throwable e) 

                            

                            @Override
                            public void onNext(String s) 
                                System.out.println("onNext:" + s + "currentThread:" + Thread.currentThread());
                            
                        );

简单说明代码使用,除了一个常用的Observable创建create、订阅subscribe方法调用及实现外,此处在创建完Observable对象后运用了线程变换方法,即subscribeOn(Schedulers.newThread()),指定订阅时的线程为子线程(可做一些耗时操作),observeOn(AndroidSchedulers.mainThread())指定接收时的线程为主线程(可做一些UI变化)。

有了以上基本了解后,这里先提前列出RxJava 1.x版本中线程变换原理的重要元素:

  • Scheduler调度者
  • Operator操作符接口
  • lift核心操作符

以上重点就在于Scheduler抽象类,它才是线程切换的调度者,涉及到的重要元素有:

  • Scheduler抽象类
  • Worker —– 真正做线程调度的类
  • Action0 —— 在线程中执行的操作
  • schedule —— 实际做线程调度的方法


2. Scheduler源码分析

  • Scheduler调度者

找到Scheduler抽象类,查看其源码组成,如下:

可知Scheduler确实是一个抽象类,内有一个抽象方法createWorker(),方法参数是Worker。主要是还有一个抽象内部类Worker,实现了Subscription接口,内部有不同参数重载的两个schedule方法,方法中第一个参数的类型都是Action0。对比以上列举的重要元素,全部集齐!

同时也可以看出,虽然Scheduler抽象类是线程切换的调度者,但其内部只有一个抽象方法,因此幕后BOSS还是内部抽象类Worker,其内部的schedule方法作用就是进行线程调度,参数Action0就是即将被调度的线程操作。


  • 原理归纳

下面回到我们调用的线程方法源码上来,在分析之前还是抛出一些原理步骤归纳:

  1. 传入不同Scheduler来调度不同的线程
  2. 用Scheduler创建Worker来使用真正的线程池
  3. 传入具体操作Action0
  4. 通过schedule方法来实现调度

首先第一个步骤我们在调用subscribeOn(Schedulers.newThread())observeOn(AndroidSchedulers.mainThread())线程切换的方法时已经使用到了,接着深入Schedulers源码,查看其第二个步骤具体实现。

注意:这里不要将Scheduler和Schedulers弄混淆,前者是线程调度者,而后者Schedulers是一个静态类,为了提供创建Scheduler对象方法的工厂模式。


  • Schedulers

首先映入眼帘的就是Schedulers类三个Scheduler类型成员变量,从其变量名可知分别运用于不同的场景:计算调度、io读写调度、新线程调度,结合之前对Schedulers介绍,它内部采用了工厂模式来创建不同类型的Scheduler,供开发者使用。

我们使用的是第三个newThreadScheduler,查看红框的创建源码可知,第一次调用是nt变量为null,由RxJavaSchedulersHook的createNewThreadScheduler()方法创建newThreadScheduler,继而查看其方法实现:

可见此方法内部调用了另外一个参数重载的createNewThreadScheduler方法,创建了一个RxThreadFactory对象实例作为参数传入。细心的你会发现创建RxThreadFactory时传入了“RxNewThreadScheduler-”字符串,而在一开始展示的例子输出订阅时所在的线程Log中也包含了此字符串。

继续查看下面重载的方法,它创建了一个NewThreadScheduler实例,并将方法参数RxThreadFactory传入其构造方法中,最后返回。重点就是NewThreadScheduler类,查看其详细实现:

  • NewThreadScheduler

如上,之前在介绍Scheduler抽象类时,内部一个创建Worker的抽象方法createWorker,现在找到了其继承类——NewThreadScheduler,重点查看它实现的createWorker方法,创建了一个NewThreadWorker对象返回,将成员变量NewThreadWorker传给构造方法。再来查看NewThreadWorker源码:

  • NewThreadWorker

NewThreadWorker类中代码太多,这里先讲解重点部分。如上图,此类必然是继承于Worker抽象类,同时又实现了Subscription接口。注意其成员变量红框部分,既然Worker的继承类是真正实现线程调度的BOSS,其过程必定会有线程池的参与,其成员变量EXECUTORS正好印证,后续再讲。NewThreadWorker既然继承于抽象类Worker,必定实现了重要抽象方法schedule方法,查看其源码:

schedule方法内部最终调用的还是重载的多参数方法,查看多参数的schedule方法,发现其内部又调用了subscribeActual方法,并传入具体操作Action0参数,其方法名暗示也很明显了,来查看这个“真正调度”方法的实现:

subscribeActual方法的后两个参数指定延长时间,重点放到具体操作Action0参数。首先第一行代码RxJavaHooks.onScheduledAction(action)就是返回参数action,赋值给成员变量decoratedAction;下一步又使用ScheduledAction对Action0进行了一次包装,返回ScheduledAction类型变量run,后面直接将这个变量run传入executor线程池去运行了。

以上是这个重点方法的大致逻辑,再来详细查看细节部分,我们似乎还不太清楚ScheduledAction类型变量run的真面目,但说到executor线程池直接去运行run了,这结果已经不言而喻了。笔者抢答:线程池运行的肯定是线程啊,莫非ScheduledAction继承于Thread?好像离真相越来越近了,先来查看ScheduledAction的真面目:

果不其然,不过ScheduledAction是实现的Runnable接口,其run方法中调用了Action0接口的call方法。有了简单了解后再回到subscribeActual方法,可得知线程池运行这个Runnable,最终调用其run方法,但真正核心是调用Action0接口的call方法,它也是线程调度的原理。


  • 小结

至此,再回到源码分析中一开始列举的原理四步骤:

  1. 第一步已在我们调用线程调度方法时实现了,即传入不同Scheduler来调度不同的线程
  2. 深入Scheduler发现其内部只有一个createWorker方法和内部抽象类Worker,Scheduler调度者的幕后是创建创建Worker来使用真正的线程池,即第二步总结。
  3. Worker抽象类内部有schedule调度方法,其参数Action0就是具体的操作,即第三步总结。
  4. 最终Worker的子类实现了schedule调度方法,内部就是通过线程池运行Runnable,其run方法核心就是调用Action0接口的call方法,来实现线程调度!


3. Android中的Scheduler

我们在代码中通过subscribeOn(Schedulers.newThread())来指定订阅时的子线程,以上已介绍完Schedulers。又通过observeOn(AndroidSchedulers.mainThread())指定接收数据时的主线程,来查看AndroidSchedulers的详细实现。

还是先预告其原理核心:通过Handler和Looper来实现执行在主线程。

AndroidSchedulers不同于Schedulers内部提供了3个Scheduler调度者(计算、IO、新线程)选择,此处只有一个主线程mainThreadScheduler。我们是通过调用此类的mainThread()方法获取Scheduler返回,查看其详细实现,最终创建Scheduler的地方是其构造方法红框处!

真相大白,这里创建了一个LooperScheduler对象实例,并将主线程的Looper传入到构造方法中。笔者看到Looper真是觉得倍感亲切,之前一篇文章Android 消息机制:Handler、Looper、Message源码 详细版解析 ——从入门到升天分析过,Android的主线程就是通过Handler、Looper、Message来处理消息,因此Looper都出现了,Handler还会远吗~来查看LooperScheduler实现:

  • LooperScheduler

果不其然,LooperScheduler的内部唯一成员变量就是Handler,通过创建LooperScheduler时传入的主线程Looper,生成Handler。且LooperScheduler继承了Scheduler,查看重点实现创建Worker的createWorker() 方法,创建了一个继承于Worker的HandlerWorker类,并传入Handler至其构造方法。来查看HandlerWorker类实现:

  • HandlerWorker

(注意:Looper、Handler都已出现,期待已久的Message即将登场~)

首先HandlerWorker类构造方法为成员变量Handler赋值,再看最终重载的schedule方法,重点在于红框部分。笔者露出了会心一笑,这些Handler发送Message部分相信Android猿们应当再熟悉不过了。

  1. 首先创建包装了Action0和handler的ScheduledAction,之前已经介绍过它实现的Runnable接口,其run方法中调用了Action0接口的call方法。
  2. 接着封装了包含ScheduledAction的一个Message对象。
  3. 通过主线程的Handler发送该Message,发送到的线程就是MainLooper主线程。
  4. 过程结束,当MainLooper主线程收到该消息后,会执行其run方法,其中包含着线程调度的核心,也就是Action0的call方法最终被执行。


3. Schedulers和AndroidSchedulers小结

  • 方法:
    • Schedulers: 指定订阅时的子线程subscribeOn(Schedulers.newThread())中的Schedulers
    • AndroidSchedulers:指定接收数据时的主线程 observeOn(AndroidSchedulers.mainThread())的AndroidSchedulers
  • 线程切换机制:
    • Schedulers: Java机制中的线程池执行封装过Action0接口的Runnable进行线程调度,关键处是Action0接口的call方法。
    • AndroidSchedulers: Android中的消息机制,通过获取主线程的Looper,再创建与之对应的handler,将Action0接口封装到Message中,使用主线程的handler发送。当主线程的Looper收到消息后最终执行Action0接口的call方法,完成线程调度。




二. RxJava 1.x subscribeOn、observeOn原理分析

1 subscribeOn原理

前面部分讲解都是subscribeOn(Schedulers.newThread())参数部分,理解了大致的线程变换机制和Action0接口的重要性,在此回到一开始展示的基本使用代码,通过subscribeOn方法指定订阅时的线程,内部具体原理如何?

此节将深入分析subscribeOn方法源码,在此之前还是先列举出其原理步骤:

  1. 通过OnSubscribe接口来处理
  2. 利用Scheduler将发出动作Action0放到线程中执行

如上subscribeOn源码,最终返回了一个新的Observable对象,而方法内部调用了Observable的create方法来创建实例,此方法需要传入OnSubscribe接口参数,而源码中是创建了一个OperatorSubscribeOn实例,并传入旧的Observable实例和Scheduler实例。相必OperatorSubscribeOn实现了OnSubscribe接口,查看其实现:

果然如此!当我们再调用subscribeOn(Schedulers.newThread())取指定订阅时的子线程,调用之后会重新返回一个新的Observable,并创建一个新的OnSubscribe接口。这也就意味着基本元素订阅原理的最后一步,即调用OnSubscribe接口的call方法并传入Subscriber时,此OnSubscribe接口是新的OnSubscribe接口,而并非是我们调用create方法创建Observable传入的OnSubscribe接口。

继续查看其重点方法call,调用scheduler.createWorker()创建了一个线程调度对象Worker,调用schedule方法进行线程调度,意味着schedule方法中的参数——实现的Action0接口中的call逻辑在Worker调度的线程中执行! 而一开始我们调用subscribeOn(Schedulers.newThread())传入的是newThread,因此Action0接口中的call逻辑在newThread子线程中执行!

继续查看其重点方法call的参数Subscriber类型,而在newThread子线程中执行的Action0接口中的call方法,对Subscriber重新进行了一次简单封装在实现对应的onNextonComplete方法内部调用的还是的旧Subscriber实例,最后一行则是调用Observable的subscribe订阅方法,传入新的Subscriber实例。这也就意味着我们调用create方法创建Observable传入的OnSubscribe接口中call方法内对Subscribe的逻辑操作,例如调用Subscribe的onNext方法,这些发起的操作被执行的线程是子线程。

以上方法印证了先前列举的原理步骤中的第二步:OperatorSubscribeOn类中的call方法内部利用Scheduler将发出动作Action0放到线程中执行

最后再总结一下OperatorSubscribeOn类中的call方法内部逻辑:

  • 获取调度者Worker
  • 调用Worker的调度方法schedule,由于之前传入的参数是newThread,因此Action0接口中的所有逻辑操作都是在子线程newThread中执行。
  • Action0接口中的call方法重新封装了Subscribe类并传入,这也就意味着调用Subscribe的onNext等方法这些发起操作时,执行的线程实在子线程中!这也是为何可以将发出的动作被执行于子线程的核心所在。



2. observeOn原理

此节将深入分析observeOn方法源码,在此之前还是先列举出其原理步骤:

  • 采用lift操作符(代理模式)
  • 通过Subscriber来处理
  • 利用Scheduler将发出动作Action0放到线程中执行

此重载方法最终调用的是如上,内部采用了lift操作符,真相已然明确一半,在上一篇文章中分析过:lift操作符创建了并返回了一个类似于代理的Observable,来接收原Observable发起的数据,然后在Operator中重新包装了一个新的Subscriber实例返回,此实例中预先对数据做一些处理后传递并调用原Subscriber的onNext等方法。

此处的observeOn亦同理,创建了一个代理的Observable,并创建一个代理观察者接受上一级Observable的数据,代理观察者收到数据之后会开启一个线程,在新的线程中,调用下一级观察者的onNextonCompeteonError方法。

来查看observeOn操作符的源码:

如上图红框所示,其逻辑与上一篇文章中lift操作符原理完全类似!只是此处多了一个逻辑处理,即对线程切换的处理。继续查看封装后的Subscriber实现,是如何处理线程切换,ObserveOnSubscriber源码如下:

    //代理观察者
    private static final class ObserveOnSubscriber<T> extends Subscriber<T> implements Action0 
        final Subscriber<? super T> child;
        final Scheduler.Worker recursiveScheduler;
        final NotificationLite<T> on;
        final Queue<Object> queue;
        //接收上一级Observable发出的数据
        @Override
        public void onNext(final T t) 
            if (isUnsubscribed() || finished) 
                return;
            
            if (!queue.offer(on.next(t))) 
                onError(new MissingBackpressureException());
                return;
            
            schedule();
        
        ......
        //线程切换处理数据⭐️⭐️⭐️⭐️⭐️
        protected void schedule() 
            if (counter.getAndIncrement() == 0) 
                recursiveScheduler.schedule(this);//调用Worker调度者的schedule方法切换线程!
            
        
        //在新的线程中将数据发送给目标观察者
        @Override
        public void call() 
            long missed = 1L;
            long currentEmission = emitted;
            final Queue<Object> q = this.queue;
            final Subscriber<? super T> localChild = this.child;
            final NotificationLite<T> localOn = this.on;
            for (;;) 
                while (requestAmount != currentEmission) 
                    ...
                    localChild.onNext(localOn.getValue(v));
                
            
        
    

如上,代码中的注释已做简单分析,observeOn方法处理线程调度的主要逻辑与subscribeOn相同,都是依赖Worker类的schedule方法切换线程,根据不同实现类型的Woker(例如子线程NewThreadWorker、主线程HandlerWorker)切换到与之对应的线程,只不过封装的调用线程切换方法的对象不同(前者是subscribeOn接口,后者是observeOn类)。



3. subscribeOn与observeOn原理小结

  • 调度的线程:
    • subscribeOn方法:创建Observable时实现的call方法中调用Subscriber方法,这些发起的动作执行于调用subscribeOn方法时传入的指定线程(如子线程newThread、ioThread、computationThread)。
    • observeOn方法: 调用subscribe传入的observer接口中onNextonCompleteonError的逻辑处理,被执行于调用observeOn方法时传入的指定线程(如Android主线程)。
  • 处理线程切换而包装的实例:
    • subscribeOn方法: 是包装原OnSubscribe过后的OperatorSubscribeOn,在继承的call方法中调用Worker的scheduler方法来切换线程。
    • observeOn方法: 是包装原Subscribe过后的ObserveOnSubscriber,在实现的onNextonCompleteonError中调用Worker的scheduler方法来切换线程。
  • 调度者:
    • subscribeOn方法: Schedulers(例如NewThreadScheduler中的NewThreadWorker)
    • observeOn方法: LooperScheduler中的HandlerWorker




前两篇文章都是先讲解Rxjava 1.x版本的源码分析,再讲解2.x 版本这样对比着来的。介于此部分篇幅较长,将其拆分成两篇文章,下一篇分析2.x 版本线程变换机制。

若有错误,虚心指教~

以上是关于浅析RxJava 1.x&2.x版本区别及原理:1.x 版本 Scheduler调度线程变换及subscribeOnobserveOn方法源码解析的主要内容,如果未能解决你的问题,请参考以下文章

浅析RxJava 1.x&2.x版本使用区别及原理:ObservableFlowable等基本元素源码解析

浅析RxJava 1.x&2.x版本区别及原理:1.x 版本 Scheduler调度线程变换及subscribeOnobserveOn方法源码解析

浅析RxJava 1.x&2.x版本区别及原理:1.x 版本 Scheduler调度线程变换及subscribeOnobserveOn方法源码解析

浅析RxJava 1.x&2.x版本区别及原理:1.x 版本 Scheduler调度线程变换及subscribeOnobserveOn方法源码解析

浅析RxJava 1.x&2.x版本使用区别及原理:ObservableFlowable等基本元素源码解析

Android异步框架RxJava 1.x系列 - 观察者模式及实现