Rxjava 源码解析 - subscribe源码

Posted 许佳佳233

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Rxjava 源码解析 - subscribe源码相关的知识,希望对你有一定的参考价值。

Rxjava源码解析系列:
Rxjava 源码解析(一) - subscribe源码
Rxjava 源码解析(二) - 线程切换源码
Rxjava 源码解析(三) - Schedulers默认线程池

概述

rxjava的应用还是较为广泛的,在实际项目中经常与MVP一起使用,可以使代码的可读性更高。在较为复杂的一些场景下,也可以使代码更加的简洁。
本文将会是对rxjava源码的探索,适合已经对rxjava的使用有一定经验的读者。

对MVP有兴趣的读者可以看下笔者的前文:
MVC、MVP、MVVM小记

同步Demo

主要逻辑如下:

  1. Observable调用create()创建 ObservableCreate
  2. ObservableCreate调用map,返回ObservableMap
  3. ObservableMap调用subscribe,执行最终的逻辑。
     Observable.create(new ObservableOnSubscribe<String>() 

      @Override
      public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable 
        Log.i("RxJavaTest", "subscribe");
        emitter.onNext("123");
      
    ).map(new Function<String, String>() 

      @Override
      public String apply(String s) throws Throwable 
        return s + "456";
      
    ).subscribe(new Observer<String>() 

      @Override
      public void onSubscribe(@NonNull Disposable d) 
        Log.i("RxJavaTest", "onSubscribe");
      

      @Override
      public void onNext(@NonNull String s) 
        Log.i("RxJavaTest", "onNext: " + s);
      

      @Override
      public void onError(@NonNull Throwable e) 
        Log.i("RxJavaTest", "onError");
      

      @Override
      public void onComplete() 
        Log.i("RxJavaTest", "onComplete");
      
    );

运行结果

RxJavaTest: onSubscribe
RxJavaTest: subscribe
RxJavaTest: onNext: 123456

onNext执行任务栈

此处由于都是在当前线程运行的,所以可以通过任务栈看到大概逻辑,如果是在其他线程执行的,则用打印任务栈的方式无法看到全貌。
读者如果有兴趣,可以通过打断点在android studio中看到任务栈

从执行subscribe开始,到最终调用到onNext的任务栈如下:

  • ObservableMap.subscribe
  • ObservableMap.subscribeActual
  • ObservableCreate.subscribe
  • ObservableCreate.subscribeActual
  • MapObserver.onNext
  • Observer.onNext

源码解析

Observable.subscribe

主要逻辑如下:

  • 会调用RxJavaPlugins.onSubscribe 来执行hook的逻辑,这个逻辑不设置的话就没有。
  • 然后就是会调用到subscribeActual方法,这个方法在Observable中是abstract方法,在它的子类中才会实现,比如ObservableMap和ObservableCreate都是它的子类。
    @SchedulerSupport(SchedulerSupport.NONE)
    @Override
    public final void subscribe(@NonNull Observer<? super T> observer) 
        Objects.requireNonNull(observer, "observer is null");
        try 
            observer = RxJavaPlugins.onSubscribe(this, observer);

            Objects.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");

            subscribeActual(observer);
         catch (NullPointerException e)  // NOPMD
            throw e;
         catch (Throwable e) 
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        
    

ObservableMap.subscribeActual

主要逻辑:

  • ObservableMap.subscribeActual最终调用到的是source.subscribe
  • source是在构造函数的时候传进来的
  • ObservableMap的构造函数,是在ObservableCreate的map方法中调用的
  • 所以此处的source就是ObservableCreate,最终调用到的是ObservableCreate.subscribe
    @Override
    public void subscribeActual(Observer<? super U> t) 
        source.subscribe(new MapObserver<T, U>(t, function));
    
abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> 

    /** The source consumable Observable. */
    protected final ObservableSource<T> source;

    /**
     * Constructs the ObservableSource with the given consumable.
     * @param source the consumable Observable
     */
    AbstractObservableWithUpstream(ObservableSource<T> source) 
        this.source = source;
    

    @Override
    public final ObservableSource<T> source() 
        return source;
    


    public final <@NonNull R> Observable<R> map(@NonNull Function<? super T, ? extends R> mapper) 
        Objects.requireNonNull(mapper, "mapper is null");
        return RxJavaPlugins.onAssembly(new ObservableMap<>(this, mapper));
    

ObservableCreate.subscribeActual

ObservableCreate.subscribe调用的其实就是Observable.subscribe,这个前面讲到,最终会调用到ObservableCreate.subscribeActual。

主要逻辑:

  • 首先会调用到Observer的onSubscribe逻辑
  • 然后会在try-catch中执行subscribe
  • 如果try-catch中出现异常,会调用 onError的逻辑
  • 此处subscribeActual传进来的参数实际上是MapObserver,因此Demo中调用onNext方法的对象也是MapObserver
    @Override
    protected void subscribeActual(Observer<? super T> observer) 
        CreateEmitter<T> parent = new CreateEmitter<>(observer);
        observer.onSubscribe(parent);

        try 
            source.subscribe(parent);
         catch (Throwable ex) 
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        
    
 Observable.create(new ObservableOnSubscribe<String>() 

      @Override
      public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable 
        Log.i("RxJavaTest", "subscribe");
        emitter.onNext("123");
      
    )

ObservableMap.MapObserver.onNext

主要逻辑:

  • 先通过mapper.apply将onNext参数处理
  • 处理完成的数传给downstream的onNext处理
  • 此处downstream其实就是Demo中subsrcibe的参数,具体的可以通过MapObserver和其父类BasicFuseableObserver的构造函数看出。
    static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> 
        final Function<? super T, ? extends U> mapper;

        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) 
            super(actual);
            this.mapper = mapper;
        
        @Override
        public void onNext(T t) 
            if (done) 
                return;
            

            if (sourceMode != NONE) 
                downstream.onNext(null);
                return;
            

            U v;

            try 
                v = Objects.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
             catch (Throwable ex) 
                fail(ex);
                return;
            
            downstream.onNext(v);
        
        
————————————————省略

    public BasicFuseableObserver(Observer<? super R> downstream) 
        this.downstream = downstream;
    

以上是关于Rxjava 源码解析 - subscribe源码的主要内容,如果未能解决你的问题,请参考以下文章

Rxjava 源码解析 - 线程切换源码

Rxjava 源码解析 - 线程切换源码

Rxjava 源码解析 - subscribe源码

Rxjava 源码解析 - Schedulers默认线程池

Rxjava 源码解析 - Schedulers默认线程池

Rxjava 源码解析 - Schedulers默认线程池