RxJava之七——RxJava 2.0 图文分析create() subscribe()map()observeOn()subscribeOn()源码

Posted 薛瑄

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RxJava之七——RxJava 2.0 图文分析create() subscribe()map()observeOn()subscribeOn()源码相关的知识,希望对你有一定的参考价值。

前言

16年 的时候写过两篇关于Rxjava 1.0 的源码分析,时过境迁,现在早已是2.0 了。2.0 的代码逻辑,封装,更为易懂,也包含了 一些新特性背压,面向切面等等。所以决定,写篇文章分析RxJava 2.0

关于RxJava,从表面上看起来很容易使用,但是如果理解不够深刻,使用过程中,往往会出现一些问题,所以我写了系列文章,从入门到精通,从简单的使用到部分源码详解,希望能给读者一个质的飞跃:
1、RxJava之一——一次性学会使用RxJava RxJava简单的使用和使用它的好处
2、RxJava之二——Single和Subject 与Observable举足轻重的类,虽然用的少,但应该知道
3、RxJava之三——RxJava 2.0 全部操作符示例
4、RxJava之四—— Lift()详解 想要了解Operators,Lift()一定要学习
5、RxJava之五—— observeOn()与subscribeOn()的详解Scheduler线程切换的原理
6、RxJava之六——RxBus 通过RxJava来替换EventBus
7、RxJava之七——RxJava 2.0 图文分析create()、 subscribe()、map()、observeOn()、subscribeOn()源码 这张图可能是全网最详细 明了的图

Rxjava2.x 与1.x 的相关文章:

关于 RxJava 最友好的文章—— RxJava 2.0 全新来袭
官方文档:What’s different in 2.0
RxJava github

示例

Rxjava的使用流程,相信大家都很清楚了,以下面这个简单的demo,重点分析一下create()、 subscribe()、map()、observeOn()、subscribeOn()源码。 只要了解这些源码,再去看其他的类都会有似曾相识的感觉

        Observable.create(object : ObservableOnSubscribe<Int> 
            override fun subscribe(emitter: ObservableEmitter<Int>) 
                  emitter.onNext(1)
            )
            .map(object : Function<Int, String> 
                override fun apply(t: Int): String 
                    return t.toString()
                
            )
            .subscribeOn(Schedulers.io())
            .observeOn(androidSchedulers.mainThread())
            .subscribe(object : Observer<String> 
                override fun onComplete() 
                

                override fun onSubscribe(d: Disposable) 
                

                override fun onNext(t: String) 
                

                override fun onError(e: Throwable) 
                
            )

在看源代码时,被其中一层一层的封装和调用,类名搞的晕晕的,一不留神就不知道谁是谁。
我把源代码的简单流程 ,画了个图,先脑子里有个整体的轮廓,再去看代码,会清晰很多

关于流程图的介绍:

这张图,可能是全网关于rxjava2 大体流程,最详细的图。为了排版和方便理解,简化了函数的关系,忽略了很多细节

  • 绿色模块 表示订阅者,例如 ObservableFlowable,或者他们的子类

  • 蓝色模块 表示观察者,例如 ObserverSubscriber,或者他们的子类

  • 青色模块 表示数据发送,例如:ObservableOnSubscribeObservableSource,等等

  • 黄色模块 表示切换了线程

  • 每个模块右上角表示当前类的名称

  • 绿色的线 表示函数调用

  • 蓝色的线 表示订阅

  • 红色的线 表示数据发送

  • 黑色的线 表示对象是什么或者对象从哪里来的

基本流程

还是以上面的demo为例,抛开操作符,只分析主要流程

先来看下,demo中都使用哪些类和接口:

  • Observable 订阅者
  • Observer 观察者
  • ObservableOnSubscribe 数据发送源
public interface ObservableOnSubscribe<T> 

    // ObservableEmitter 也是一个接口,它继承接口Emitter
    void subscribe(@NonNull ObservableEmitter<T> emitter) throws Exception;


//这个接口,有常见的几种函数。在demo中可以看到,调用的oNext 就是这个接口的函数
public interface Emitter<T> 
    void oNext(@NonNull T value);
    void onError(@NonNull Throwable error);
    void onComplete();

下面一步一步进入源码:

create

    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) 
         // 判断非空
        ObjectHelper.requireNonNull(source, "source is null");
        //RxJavaPlugins 相当于是切面编程,会对所有的中间订阅者进行自定义修改,如果没有设置过。就直接当前参数
        //创建一个 ObservableCreate,它就是上图中的绿色模块
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    

来看一个RxJavaPlugins.onAssembly

    public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) 
        // onObservableAssembly  需要手动设置,实现切面效果
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        //如果没有设置 就是null,直接返回参数source
        if (f != null) 
            return apply(f, source);
        
        return source;
    

create 传入的是ObservableOnSubscribe 类型参数,需要的Observable返回类型,这里的ObservableCreate 继承了Observable 并实现接口subscribeActual,引用了ObservableOnSubscribe 并调用它的接口subscribe,所以它算是一种适配器模式。

public final class ObservableCreate<T> extends Observable<T> 

    // 每个Observable 的实现类,都有一个source,表示的是上游
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) 
        this.source = source;
    

    // subscribe 最终会调用到这里
    // observer 是观察者,它里面实现了onNext、onSubscribe 等 这些函数
    @Override
    protected void subscribeActual(Observer<? super T> observer) 
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        //调用了观察者的onSubscribe 方法
        observer.onSubscribe(parent);

        try 
            //这个上游source 就是demo中的ObservableOnSubscribe 实现,
            //调用了subscribe,以本例来说,就会执行 emitter.onNext(1)
            // 那么问题来了,subscribeActual 会在什么时候执行呢?
            source.subscribe(parent);
         catch (Throwable ex) 
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        
    

subscribe

需要注意,代码执行到这里,是谁调用了subscribe,create() 返回的是Observable类型 ObservableCreate,所以是ObservableCreate调用了subscribe,那么关于Observable的接口,自然也会调用到ObservableCreate

    public final void subscribe(Observer<? super T> observer) 
        ObjectHelper.requireNonNull(observer, "observer is null");
        try 
            // 面向切面的设置,如果没有设置,直接返回observer
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
            //以本例来看,就是执行ObservableCreate 中的subscribeActual 
            subscribeActual(observer);
         catch (NullPointerException e)  // NOPMD
            throw e;
         catch (Throwable e) 
            ... 省略代码 ...
        
    
ObservableCreate 类
    // subscribe 最终会调用到这里
    // observer 是观察者,它里面实现了onNext、onSubscribe 等 这些函数
    @Override
    protected void subscribeActual(Observer<? super T> observer) 
         // 把观察者,封装成CreateEmitter
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        //调用了观察者的onSubscribe 方法
        observer.onSubscribe(parent);

        try 
            //这个上游source 就是demo中的ObservableOnSubscribe 实现,
            //调用了subscribe,以本例来说,就会执行 emitter.onNext(1),也就是执行了parent.onNext(1)
            source.subscribe(parent);
         catch (Throwable ex) 
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        
    

最后进入CreateEmitter 来看一下:

 static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable 

        private static final long serialVersionUID = -3434801548987643227L;

        final Observer<? super T> observer;

        CreateEmitter(Observer<? super T> observer) 
            //demo 中的observer 最终被传递到这里
            this.observer = observer;
        

       // 在subscribeActual 调用到这里
        @Override
        public void onNext(T t) 
            if (t == null) 
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            
            if (!isDisposed()) 
                //如果没有disposed ,就会执行demo中observer的onNext
                observer.onNext(t);
            
        
          ... 省略代码 ...

        @Override
        public void onComplete() 
            //如果没有被dispose,会调用Observer的onComplete()方法
            if (!isDisposed()) 
                try 
                    observer.onComplete();
                 finally 
                    //执行完调用dispose
                    dispose();
                
            
        

       ... 省略代码 ...
    

源码看到这里,再去看上图,思路应该更清晰,但是应该对整图还不是很了解。

仔细总结一下会发现,在create后,创建了ObservableCreate ,他知道上游(source),知道被订阅后的处理方式(subscribeActual )也就是如何把数据发给下游,但是它需要等待调用subscribe,才会最终触发这个流程。

而且ObservableCreate 是继承于Observable ,对设计模式敏感的小伙伴,可能会想到装饰着模式,没错,所谓的操作符,无非就是用装饰着模式包裹一层,让他也知道上游(source),知道如何数据发给下游(实现subscribeActual ),最终subscribe 一调用,这个过程就被触发。

如果感觉混乱,没关系,下面跟着源码走一下,就会豁然开朗

map操作符

调用map,会执行下面的函数

    public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) 
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        //发现了吗,上面的create 创建ObservableCreate  , map 创建了 ObservableMap 
        //没错他们都是Observable的子类
        return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
    

有了新的Observable (ObservableMap),那肯定得有新的Observer,不然ObservableMap和谁关联呢?是的,新的Observer就是 MapObserver

//AbstractObservableWithUpstream 继承于Observable,有成员变量source
public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> 
    final Function<? super T, ? extends U> function;

    //这里传入的是this,也就是本例中的ObservableCreate ,它继承于Observable 继承于ObservableSource
    public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) 
        // 把上游的源保存起来
        super(source);
        //保存当前的操作函数,也就是demo 中map 里实现的函数
        this.function = function;
    

    // 太熟悉了,又是它,等ObservableMap 调用subscribe 的时候,会调用到这里
    @Override
    public void subscribeActual(Observer<? super U> t) 
       // 调用了上游(source)的subscribe,这就相当于触发了上游
       // 传入的参数是MapObserver,联想到上面的subscribe 直接订阅Observer
       // 这个MapObserver继承Observer,它的参数t 也是Observer,也就是把下游的Observer 包裹了一层,传递给上游。这正是装饰者模式
       // 本例中emitter.onNext(1)执行后,也就是会执行MapObserver中的onNext
        source.subscribe(new MapObserver<T, U>(t, function));
    

     //BasicFuseableObserver 继承了Observer,有成员变量downstream、upstream 等
    static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> 
        final Function<? super T, ? extends U> mapper;

        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) 
            // 把下游的Observer 保存在downstream 中
            super(actual);
            // 保存map 变换操作
            this.mapper = mapper;
        

        // 本例中,onNext 是在emitter.onNext(1) 执行后,调用到这里的
        @Override
        public void onNext(T t) 
            if (done) 
                return;
            

            if (sourceMode != NONE) 
                downstream.onNext(null);
                return;
            
			//目标类型
            U v;

            try 
                // 实现了变化操作,把t 转为 v 
                v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
             catch (Throwable ex) 
                fail(ex);
                return;
            
            
            //调用了下游onNext,继续分发数据,此时的数据是转换后的目标数据
            downstream.onNext(v);
        

        @Override
        public int requestFusion(int mode) 
            return transitiveBoundaryFusion(mode);
        

        @Nullable
        @Override
        public U poll() throws Exception 
            T t = qd.poll();
            return t != null ? ObjectHelper.<U>requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;
        
    

此时再去看看上面的图,是不是有点感觉了,没错,全部都是这个套路,只是每次在装饰的时候,行为不一样

subscribeOn操作符

作用是控制subscribe 的线程,下面来看看subscribeOn 是如何实现的

不仔细看代码,还以为作者直接把实现map的代码拷贝了一份,简直太相似了

    public final Observable<T> subscribeOn(Scheduler scheduler) 
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        //使用原来的Observable 和调度线程,创建一个新的Observable,就是ObservableSubscribeOn
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    

依旧是创建新的Observable(ObservableSubscribeOn) 和 Observer (SubscribeOnObserver)

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> 
    final Scheduler scheduler;

    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) 
        // 依旧是保存了上游 ObservableSource
        super(source);
        // 保存了线程调度的Scheduler 
        this.scheduler = scheduler;
    

    @Override
    public void subscribeActual(final Observer<? super T> observer) 
        // 使用装饰着模式,把原来的Observer 封装了一层
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);

		//注意,这里还没有切换新城,调用了onSubscribe
        observer.onSubscribe(parent);
		//这里是重点,
		//scheduler.scheduleDirect(new SubscribeTask(parent))   在新线程中执行parent,
		//parent.setDisposable  把新线程任务,加入到DisposableHelper,如果手动dispose后,保证线程可以停止
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    

    static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable 

        private static final long serialVersionUID = 8094547886072529208L;
        final Observer<? super T> downstream;

        final AtomicReference<Disposable> upstream;

        SubscribeOnObserver(Observer<? super T> downstream) 
            this.downstream = downstream;
            this.upstream = new AtomicReference<Disposable>();
        

        @Override
        public void onSubscribe(Disposable d) 
            DisposableHelper.setOnce(this.upstream, d);
        

       // 下面的常规操作方法,就是调用downstream  的各个操作方法
        @Override
        public void onNext(T t) 
            downstream.onNext(t);
        

        @Override
        public void onError(Throwable t) 
            downstream.onError(t);
        

        @Override
        public void onComplete() 
            downstream.onComplete();
        

        @Override
        public void dispose() 
            DisposableHelper.dispose(upstream);
            DisposableHelper.dispose(this);
        

        @Override
        public boolean isDisposed() 
            return DisposableHelper.isDisposed(get());
        

        void setDisposable(Disposable d) 
           //把当前
            DisposableHelper.setOnce(this, d);
        
    

    // 实现了线程的Runnable 接口,目的是让线程可以调度
    final class SubscribeTask implements Runnable 
        private final SubscribeOnObserver<T> parent;

        SubscribeTask(SubscribeOnObserver<T> parent) 
            this.parent = parent;
        

        @Override
        public void run() 
            // 会在新的线程中调用,source是上游的Observable
            source.subscribe(parent);
        
    

截止到这里,subscribeOn总体的逻辑已经,搞清楚了,再深入一点看一下scheduler.scheduleDirect(new SubscribeTask(parent)) 是如何实现线程切换的

    @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run) 
        //调用scheduleDirect,参数表示立即执行
        return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
    

    @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) 
        //创建了一个worker,注意这里的createWorker(),是一个抽象方法,不同的线程,创建的worker 不一样。
        // 例如:Schedulers.io()  创建的是 EventLoopWorker
        final Worker w = createWorker();

        // 依旧是切面编程,对每个切换线程的, 包裹一层
        final Runnable decoratedRun = RxJavaPlugins.onScheduleRxJava入门之路

Rxjava源码分析&实践实践环节:map操作符功能实现

RxJava系列6(从微观角度解读RxJava源码)

Rxjava源码分析&实践RxJava基本原理分析之订阅流

RXjava解析我把RXjava的源码和这份面试都给你了,你还告诉我面不过拿不到offer?

RxJava前奏之原理分析