浅析RxJava 1.x&2.x版本区别及原理:maplift操作符源码解析

Posted 鸽一门

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了浅析RxJava 1.x&2.x版本区别及原理:maplift操作符源码解析相关的知识,希望对你有一定的参考价值。

上一篇文章讲解了RxJava 1.x&2.x版本的基本元素(Observable、Subscriber、Flowable、Observer等)流程源码分析,对RxJava两个版本的基本使用元素和订阅原理有了一定的认识后,此篇文章将重点置于map、lift操作符,已知操作符的作用是将发出的数据进行处理再发送,RxJava源码内部具体操作原理如何?来一探究竟。

此系列文章重点在于分析源码,并不适合作为框架的初学篇,笔者建议熟悉其用法后再观此博客讨论学习。另此系列文章源码分析难度由浅至深,建议先学习理解之前的文章:

浅析RxJava 1.x&2.x版本使用区别及原理(一):Observable、Flowable等基本元素源码解析

本篇涉及到的知识点有:

  • RxJava 1.x map、lift使用方法及核心源码解析
  • RxJava 2.x map、lift使用方法及核心源码解析
  • 两个版本中map、lift操作符原理区别

一. RxJava 1.x操作符

1. map操作符使用

Observable.create(new Observable.OnSubscribe<String>() 
                            @Override
                            public void call(Subscriber<? super String> subscriber) 
                                if (!subscriber.isUnsubscribed()) 
                                    subscriber.onNext("1");
                                    subscriber.onNext("2");
                                    subscriber.onCompleted();
                                
                            
                        ).
                        map(new Func1<String, Integer>() 
                            @Override
                            public Integer call(String s) 
                                return Integer.parseInt(s);
                            
                        ).
                        subscribe(new Observer<Integer>() 
                            @Override
                            public void onCompleted() 
                                System.out.println("onCompleted");
                            
                            @Override
                            public void onError(Throwable e) 
                            
                            @Override
                            public void onNext(Integer integer) 
                                System.out.println("onNext:" + integer + ",integer instanceOf" + integer.getClass());
                            
                        );

在详细阅读此系列的第一篇文章的基本元素源码解说后,以上RxJava1.x 的基本订阅流程可以说是相当熟悉了。除了一个常用的Observable创建create、订阅subscribe方法调用及实现外,此处运用了操作符,即调用完create方法后的map方法。

整个流程可以解释为:在创建Observable时实现的OnSubscribe方法中进行数据传递,map方法会接收到数据根据自身逻辑转化数据(此处是将String类型转换为int类型),再传送给Observer接口中的onNext方法。

RxJava 1.x版本操作符涉及到的元素如下:

  • Func1接口
  • Operator接口

正式源码分析之前,先提前预告lift操作符原理带着微微印象与了解开始解析。步骤如下:

  1. 接收原OnSubscribe和当前的Operator
  2. 创建一个新的OnSubscribe并返回新的Observable
  3. 用新的Subscriber包裹旧的Subscriber
  4. 在新的Subscriber里做完变换再传给旧的Subscribe


2. map源码分析

  • map方法

在有了上一篇博客基本元素源码解析的基础后,直捣黄龙,查看map方法的具体实现:

  • lift方法

可见,map方法中调用的是lift方法,由它处理主要逻辑,并且将我们实现的Func1接口通过OperatorMap封装后传入。再来查看 lift方法:

注意,此方法返回的是一个新的Observable对象,传入的参数是一个新的OnSubscribe,而在创建OnSubscribe实例时传入了旧的OnSubscribe实例和 lift方法的参数。

  • OnSubscribeLift

步骤到这里符合一开始列出的lift操作符原理的前2步接收原OnSubscribe和当前的Operator,创建一个新的OnSubscribe并返回新的Observable。接下来查看OnSubscribeLift的具体实现:

OnSubscribeLift实现了OnSubscribe接口,成员变量parent很明显就是旧的OnSubscribe接口实例,operator则是包装我们传入Func1接口后的Operator对象。来捋一下思路,回顾RxJava 1.x版本中,在创建Observable时会传入OnSubscribe接口,内部源码subscribe订阅方法最核心的原理便是调用OnSubscribe接口的call方法,并传入Subscriber实例,此实例包装了我们实现的observer接口,因此我们在OnSubscribe接口调用数据传递onNext方法才回被调用。

因此与上同理,此处只是多用了一个map操作符,但是subscribe订阅方法的原理是相同的!此时创建Observable时传入的是OnSubscribeLift接口,内部源码subscribe订阅方法最后会调用OnSubscribeLift接口的call方法!解析来查看此方法中有什么特殊操作?

查看上图中OnSubscribeLift类的call方法,第一步RxJavaHooks调用了onObservableLift方法,实质就是将参数operator返回,因此RxJavaHooks.onObservableLift(operator)相当于operator,接着它调用了call方法,传入方法参数Subscriber,返回了新的Subscriber实例。第二步调用了parent,也就是OnSubscribe接口实例的call方法,并传入新的Subscriber实例。总体而言,除了常规调用OnSubscribe接口实例的call方法,特别之处在于传入call方法的参数是一个包装过新的Subscriber实例,此实例构造方法中传入了旧的Subscribe实例和运用于map方法的Func1接口!

步骤到这里符合一开始列出的lift操作符原理的后2步即用新的Subscriber包裹旧的Subscriber,在新的Subscriber里做完变换再传给旧的Subscribe。

  • Func1何时被调用?

以上一系列的分析至此,一切流程都似乎走通了,运用map方法后唯一的本质性改变就是在创建Observable又调用map方法之后,会重新创建Observable实例,此时传入一个新的OnSubscribe接口中,新的OnSubscribe接口中包装了旧的OnSubscribe和map方法中实现的Func1接口。而OnSubscribe接口的call方法中调用的依旧是旧的OnSubscribe的call方法,不同的是此处传入call方法的参数Subscriber是重新封装过的,此时的Subscriber包含了旧的Subscriber和map方法中实现的Func1接口。以上便是所有精华,可是似乎遗漏了一点,Func1接口又是在何时被调用的?其中原理如何?

  • OperatorMap

我们不得不再次回到OnSubscribeLift类的call方法中,回到第一行代码分析新的Subscriber实例的创建,其实是通过调用operator变量的call方法而来,可是Operator并不是幕后主使,operator成员变量是lift方法的参数传入进来的,而最初则是map方法中封装好的数据调用lift方法传入的,这个幕后BOSS就是OperatorMap类!洞悉其call方法,便知Func1接口又是在何时被调用的,详细查看OperatorMap源码详情:

首先OperatorMap类实现了Operator接口,且其成员变量transform就是我们实现处理结果的Func1接口,在构造方法中接受赋值。lift操作符原理的后2步,即用新的Subscriber包裹旧的Subscriber,在新的Subscriber里做完变换再传给旧的Subscribe,其根源就在此call方法中。首先通过成员变量Func1和参数Subscriber创建了MapSubscriber对象实例,对应红字前半句话,而后半句则要深入MapSubscriber查看:

  • MapSubscriber

MapSubscriber类继承于Subscriber,成员变量便是旧的Subscriber实例和Fun c1接口实例,查看onNext方法便可洞悉,首先调用Func1接口的call方法,即我们实现的数据预先处理方法,再调用旧的Subscriber实例的onNext方法,传入已经转化过的数据。

这也就意味着我们在实现OnSubscribe接口中的call方法操作Subscribe时,实际真正在处理的对象是MapSubscriber,是它首先根据Func1接口中的逻辑转换数据,再调用旧的Subscriber实例传递数据。到此处,已然柳暗花明!更是map操作符的核心处理位置!MapSubscriber中的操作有些类似于代理机制,内部对旧的Subscriber实例进行了一层包装,在调用旧的Subscriber实例中的方法之前自己做相应的处理,再将处理后的数据传递给它。



3. 总结

进行到此处,再去回顾一开始列举的lift运算原理,便可了然于心。RxJava 1.x中map操作符源码原理解析至此,已然真相大白。

最后再强调一次,运用map方法后唯一的本质性改变,就是在创建Observable后调用map方法时,map方法会重新创建并返回Observable实例,此时传入一个新的OnSubscribe接口中:

  • 新的OnSubscribe接口中包装了旧的OnSubscribe和map方法中实现的Func1接口。重点在于OnSubscribe接口的call方法,该方法中调用的依旧是旧的OnSubscribe的call方法,但是传入call方法的参数Subscriber是重新封装过的
  • 新的Subscriber包含了旧的Subscriber和map方法中实现的Func1接口,与之不同的是新的Subscriber在实现onNextonCompleteonError方法时重新处理了相关逻辑,虽然其本质最后调用的还是旧的Subscriber的onNextonCompleteonError方法,重点就是在调用以上方法之前将获取的数据进行转化,即我们实现的Func1接口经过转化之后的数据再传递给旧的Subscriber的onNext

以上这些操作也就意味着,在运用map操作符之后,我们创建Observable时实现的OnSubscribe接口中的call方法的参数Subscriber,在使用Subscriber去调用onNextonComplete方法,内部实质是通过创建一个新的Subscriber来管理这一切!类似于一个中转站,现将数据转化再传递给旧的Subscriber,让它去接收。

笔者在理解这一切后,做一个简单的比喻:运用map操作符,RxJava内部就会去新建一个中转站来获得数据,经过处理之后再分发!以上分析再结合之前列举的lift原理步骤操作,一切皆可真相大白。





二. RxJava 2.x操作符

1. map操作符使用

                Observable.
                        create(new ObservableOnSubscribe<String>() 
                            @Override
                            public void subscribe(ObservableEmitter<String> e) throws Exception 
                                if (!e.isDisposed()) 
                                    e.onNext("1");
                                    e.onNext("2");
                                    e.onComplete();
                                
                            
                        ).
                        map(new Function<String, Integer>() 
                            @Override
                            public Integer apply(String s) throws Exception 
                                return Integer.parseInt(s);
                            
                        ).
                        subscribe(new Observer<Integer>() 
                            @Override
                            public void onSubscribe(Disposable d) 
                                System.out.println("onSubscribe");
                            
                            @Override
                            public void onNext(Integer value) 
                                System.out.println("onNext:" + value);
                            
                            @Override
                            public void onError(Throwable e) 
                            
                            @Override
                            public void onComplete() 
                                System.out.println("onComplete");
                            
                        );

2.x版本的使用与1.x版本基本相同,逻辑相同,此处不再赘述。RxJava 2.x无背压版本操作符涉及到的元素如下:

  • Function接口
  • AbstractObservableWithUpstream抽象类

其实在阅读RxJava 1.x中的map源码分析后,稍微打下了些基础,在进行2.x版本的map源码分析前,首先介绍一下AbstractObservableWithUpstream抽象类,带着微微印象与了解再去解析源码:

  • 继承此类
  • 利用其subscribeActual方法
  • 用原Observable去subscribe变换后的Observer


2. map源码分析

map方法源码如上,核心关键最后一句代码,又是熟悉RxJavaPlugins的nAssembly方法,没有什么实质内容,就是返回其参数。此处的参数是创建一个ObservableMap对象实例,传入Observable和map方法中的参数Function接口。接下来查看ObservableMap源码:

  • ObservableMap

如上,可见ObservableMap是继承于AbstractObservableWithUpstream,而它又继承于Observable。熟悉第一篇基本元素源码分析的朋友会发现ObservableMap中也有熟悉的subscribeActual方法,笔者凭着自己微薄的记忆力,记得Observable的subscribe方法,最终调用的是subscribeActual方法,即调用创建Observable对象时传入的参数——OnSubscribe接口的方法。再回到ObservableMap中,发现其操作相同!区别在于此处传入的参数不再是Observer接口,而是MapObserver类型参数。

以上步骤也符合一开始列觉原理步骤的前三步,继续查看MapObserver源码:

其实MapObserver最终的父类还是Observer,只不过此处多做了一层层封装,其中两个成员变量是旧的Observer接口和Function实例。

  • 笔者推测,套路总结

阅读自此,还未研究此类详细实现的笔者已然心知肚明!这种套路简直太熟悉了,创建一个类的新的实例,传入已存在即旧的类实例,并传入我们实现转换数据逻辑的Function接口。笔者大言不惭的说内部原理肯定是有关onNextonComplete方法调用者还是旧的类实例,而创建新的类的目的就是多做了一层包装,即调用Function接口预先将数据转化之后,再分发给旧的类实例去调用onNext等传递数据

好了,以上笔者一顿柯南附身后,举例其onNext方法,总结此方法中应当只做两步骤:调用Function接口转化数据,调用旧的Observer接口的onNext方法,将数据传递给它。

还是回到源码中,先忽略掉那些判断,重点在于红框部分,发现二个步骤确实如上所说!调用了Function接口我们实现的apply方法,将转化后的数据v作为参数,传递到旧的Observer接口的onNext方法中!完美契合~



3. Rxjava 1.x&2.x 对比总结map原理⭐️⭐️⭐️⭐️⭐️

笔者在分析RxJava 2.x版本的map操作符源码后半部分简直如虎添翼,因为与1.x 版本的处理太过相似,换汤不换药。此点来分析总结两者的原理区别:

(1)RxJava 1.x map原理

  • RxJava 1.x版本的map操作符,内部主要是lift在处理,但核心依旧是创建一个新的Observable实例返回,传入一个新的OnSubscribe接口(OnSubscribeList类型)
  • 新的OnSubscribe接口内封装了旧的OnSubscribe实例和Func1接口,重点在于该OnSubscribe接口call方法的特殊处理,其内部还是调用的旧的OnSubscribe实例call方法,不过传入的是一个新的Subscriber实例(MapSubscriber类型)
  • 新的Subscriber实例内又是封装了旧的Subscriber实例和Func1接口,Subscriber类在实现Observer接应的onNextonComplete等方法时,调用的还是旧的Subscriber实例的onNext等方法,不同的也是参数,预先获取的数据通过Func1接口转化之后再传入。

以上便是RxJava 1.x对于map操作的核心原理及处理步骤,可知Func1接口最终调用的地方是在新创建的Subscriber类实现的onNextonComplete等方法时相应调用处理数据转化。其实使用map操作就是多了一个中转站,此处就是新的Subscriber实例,它预先获取数据,将其转换后再分发给旧的Subscriber实例。


(2)订阅基本元素原理回顾

再次回顾RxJava 1.x版本中Observable的subscribe处理,通过调用创建Observable传入的OnSubscribe接口的call方法正式触发订阅事件,后续Observer接口中onNext、onComplete方法才回被调用。而RxJava 2.x版本中的处理亦如是,只不过OnSubscribe接口换成了ObservableOnSubscribe接口,call方法换成了subscribe方法,参数由Subscriber更换成了ObservableEmitter。

首先将两个版本基本元素原理的易混淆点提前解释:

  • Observable对象create方法:
    • RxJava 1.x 中调用Observable的create方法创建实例时,是直接new 一个Observable对象,直接传入OnSubscribe接口。
    • 2.x 版本是new 一个ObservableCreate对象(继承于Observable),类似于多做了一层封装,与之对应的OnSubscribe接口作为其成员变量传入。
  • Observable对象subscribe方法:
    • RxJava 1.x 内部是通过重载subscribe方法,最终核心处就是调用OnSubscribe接口的call方法,传入Subscriber类型参数正式触发订阅事件。
    • 2.x 版本最终调用的是一开始创建Observable对象的subscribeActual方法,方法内的最终核心处也是调用OnSubscribe接口的subscribe方法,传入ObservableEmitter类型参数正式触发订阅事件。

(3)RxJava 2.x map原理

在回顾完两个版本的订阅基本元素原理后,再来总结两者对于map操作的原理比较,更为容易。

  • RxJava 2.x版本的map操作符与1.x 版本相比,内部少了一层lift封装,但核心依旧是创建一个新的Observable实例(ObservableMap类型,与创建Observable对象时返回的ObservableCreate类型相似)返回,传入旧的Observable实例和Function接口。
  • 重点还是在此ObservableMap中的subscribeActual方法,注意上一篇文章即上一点都强调过,该方法就是Observable调用subscribe订阅事件时处理的核心,其逻辑就是调用OnSubscribe接口的subscribe方法,传入ObservableEmitter类型参数正式触发订阅事件。再回到ObservableMap中的subscribeActual方法中,确实如此,唯一改变就是传入的不是ObservableEmitter,而直接是一个实现了Observer接口的MapObserver实例(与1.x 版本中实现了Observer接口的Subscriber作用何其相似!)
  • 新的MapObserver实例内封装了Observer接口实例和Function接口,起作用就是对Observer接口内待实现的方法多做了一层封装!在实现onNextonComplete等方法时,调用的还是旧的Subscriber实例的onNext等方法,不同的是参数,将预先获取的数据通过Function接口转化之后再传入。

以上便是RxJava 2.x对于map操作的核心原理及处理步骤,可知Function接口最终调用的地方是在新创建的MapObserver类实现的onNextonComplete等方法时相应调用处理数据转化。其原理与1.x 版本相差无几!其实使用map操作就是多了一个中转站,此处就是新的MapObserver实例,它预先获取数据,将其转换后再分发给Observer接口。



4. operator接口拓展——lift

在分析RxJava 1.x版本中map操作原理时,发现幕后主要是lift操作符处理所有逻辑,而2.x版本中却未见lift 踪影,因此直接在2.x 版本的Observable类中搜索lift方法查看其原理:

以上是lift方法源码,其参数ObservableOperator接口,即我们在调用lift操作符时需要实现的接口,源码如下:

与Function接口类似,但作用不同,其参数也不同!继续回到lift方法,其核心部分返回继承了Observable的ObservableLift对象实例,传入了Observable实例和接口,查看ObservableLift源码:

毫无疑问,同样ObservableLift也实现了subscribeActual 方法,其内部处理与map方法中返回的ObservableMap对象中实现的逻辑相同,即调用创建Observable对象时传入的参数——OnSubscribe接口的方法,传入一个新的Observer接口实例。

在了解之后可以总结Operator接口原理步骤:

  • 实现此接口
  • subscribeActual 中做变换
  • 用于扩展自定义操作符




Rxjava 2.x的无背压版本map、lift操作符原理同无背压版完全相同,只是相关处理类由Observablexxx 变成了Flowablexxx,在此不再赘述。

笔者刚开始分析Rxjava 1.xmap、lift操作符原理时,完全把自己绕进去了,苦思不得其解,后来反复阅读上一篇订阅基本流程的原理分析博客,总算想通了。源码分析不是一个简单的过程,笔者更不敢大言不惭得说自己的博客通俗易懂,功力实在有限。最后笔者建议源码分析过程有疑问者可着重阅读文章每一大点的小结部分,特别是本篇第二大点中的“Rxjava 1.x&2.x 对比总结map原理”,使出浑身解数才尽力深入浅出总结而来。能够理解以上知识点,足以举一反三

若有错误,虚心指教~

以上是关于浅析RxJava 1.x&2.x版本区别及原理:maplift操作符源码解析的主要内容,如果未能解决你的问题,请参考以下文章

浅析RxJava 1.x&2.x版本使用区别及原理:ObservableFlowable等基本元素源码解析

浅析RxJava 1.x&2.x版本区别及原理:1.x 版本 Scheduler调度线程变换及subscribeOnobserveOn方法源码解析

浅析RxJava 1.x&2.x版本区别及原理:1.x 版本 Scheduler调度线程变换及subscribeOnobserveOn方法源码解析

浅析RxJava 1.x&2.x版本区别及原理:1.x 版本 Scheduler调度线程变换及subscribeOnobserveOn方法源码解析

浅析RxJava 1.x&2.x版本使用区别及原理:ObservableFlowable等基本元素源码解析

Android异步框架RxJava 1.x系列 - 观察者模式及实现