Rxjava 源码解析 - subscribe源码

Posted 许佳佳233

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Rxjava 源码解析 - subscribe源码相关的知识,希望对你有一定的参考价值。

概述

rxjava的应用还是较为广泛的,在实际项目中经常与MVP一起使用,可以使代码的可读性更高。在较为复杂的一些场景下,也可以使代码更加的简洁。
本文将会是对rxjava源码的探索,适合已经对rxjava的使用有一定经验的读者。

对MVP有兴趣的读者可以看下笔者的前文:
MVC、MVP、MVVM小记

同步Demo

主要逻辑如下:

  1. Observable调用create()创建 ObservableCreate
  2. ObservableCreate调用map,返回ObservableMap
  3. ObservableMap调用subscribe,执行最终的逻辑。
     Observable.create(new ObservableOnSubscribe<String>() {

      @Override
      public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {
        Log.i("RxJavaTest", "subscribe");
        emitter.onNext("123");
      }
    }).map(new Function<String, String>() {

      @Override
      public String apply(String s) throws Throwable {
        return s + "456";
      }
    }).subscribe(new Observer<String>() {

      @Override
      public void onSubscribe(@NonNull Disposable d) {
        Log.i("RxJavaTest", "onSubscribe");
      }

      @Override
      public void onNext(@NonNull String s) {
        Log.i("RxJavaTest", "onNext: " + s);
      }

      @Override
      public void onError(@NonNull Throwable e) {
        Log.i("RxJavaTest", "onError");
      }

      @Override
      public void onComplete() {
        Log.i("RxJavaTest", "onComplete");
      }
    });

运行结果

RxJavaTest: onSubscribe
RxJavaTest: subscribe
RxJavaTest: onNext: 123456

onNext执行任务栈

此处由于都是在当前线程运行的,所以可以通过任务栈看到大概逻辑,如果是在其他线程执行的,则用打印任务栈的方式无法看到全貌。
读者如果有兴趣,可以通过打断点在android studio中看到任务栈

从执行subscribe开始,到最终调用到onNext的任务栈如下:

  • ObservableMap.subscribe
  • ObservableMap.subscribeActual
  • ObservableCreate.subscribe
  • ObservableCreate.subscribeActual
  • MapObserver.onNext
  • Observer.onNext

源码解析

Observable.subscribe

主要逻辑如下:

  • 会调用RxJavaPlugins.onSubscribe 来执行hook的逻辑,这个逻辑不设置的话就没有。
  • 然后就是会调用到subscribeActual方法,这个方法在Observable中是abstract方法,在它的子类中才会实现,比如ObservableMap和ObservableCreate都是它的子类。
    @SchedulerSupport(SchedulerSupport.NONE)
    @Override
    public final void subscribe(@NonNull Observer<? super T> observer) {
        Objects.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);

            Objects.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");

            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }

ObservableMap.subscribeActual

主要逻辑:

  • ObservableMap.subscribeActual最终调用到的是source.subscribe
  • source是在构造函数的时候传进来的
  • ObservableMap的构造函数,是在ObservableCreate的map方法中调用的
  • 所以此处的source就是ObservableCreate,最终调用到的是ObservableCreate.subscribe
    @Override
    public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MapObserver<T, U>(t, function));
    }
abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {

    /** The source consumable Observable. */
    protected final ObservableSource<T> source;

    /**
     * Constructs the ObservableSource with the given consumable.
     * @param source the consumable Observable
     */
    AbstractObservableWithUpstream(ObservableSource<T> source) {
        this.source = source;
    }

    @Override
    public final ObservableSource<T> source() {
        return source;
    }

}
    public final <@NonNull R> Observable<R> map(@NonNull Function<? super T, ? extends R> mapper) {
        Objects.requireNonNull(mapper, "mapper is null");
        return RxJavaPlugins.onAssembly(new ObservableMap<>(this, mapper));
    }

ObservableCreate.subscribeActual

ObservableCreate.subscribe调用的其实就是Observable.subscribe,这个前面讲到,最终会调用到ObservableCreate.subscribeActual。

主要逻辑:

  • 首先会调用到Observer的onSubscribe逻辑
  • 然后会在try-catch中执行subscribe
  • 如果try-catch中出现异常,会调用 onError的逻辑
  • 此处subscribeActual传进来的参数实际上是MapObserver,因此Demo中调用onNext方法的对象也是MapObserver
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
 Observable.create(new ObservableOnSubscribe<String>() {

      @Override
      public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {
        Log.i("RxJavaTest", "subscribe");
        emitter.onNext("123");
      }
    })

ObservableMap.MapObserver.onNext

主要逻辑:

  • 先通过mapper.apply将onNext参数处理
  • 处理完成的数传给downstream的onNext处理
  • 此处downstream其实就是Demo中subsrcibe的参数,具体的可以通过MapObserver和其父类BasicFuseableObserver的构造函数看出。
    static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
        final Function<? super T, ? extends U> mapper;

        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual);
            this.mapper = mapper;
        }
        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != NONE) {
                downstream.onNext(null);
                return;
            }

            U v;

            try {
                v = Objects.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
            } catch (Throwable ex) {
                fail(ex);
                return;
            }
            downstream.onNext(v);
        }
        
————————————————省略
}
    public BasicFuseableObserver(Observer<? super R> downstream) {
        this.downstream = downstream;
    }

以上是关于Rxjava 源码解析 - subscribe源码的主要内容,如果未能解决你的问题,请参考以下文章

Rxjava 源码解析 - 线程切换源码

Rxjava 源码解析 - 线程切换源码

Rxjava 源码解析 - subscribe源码

Rxjava 源码解析 - Schedulers默认线程池

Rxjava 源码解析 - Schedulers默认线程池

Rxjava 源码解析 - Schedulers默认线程池