Rxjava 源码解析 - 线程切换源码

Posted 许佳佳233

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Rxjava 源码解析 - 线程切换源码相关的知识,希望对你有一定的参考价值。

概述

前文已经讲了rxjava 简单subscribe的源码,有兴趣的读者可以看下。
Rxjava 源码解析(一) - subscribe源码

本文将基于前文的分析,继续讲下rxjava中断线程切换。

Demo

主要逻辑如下:

  1. Observable调用create()创建 ObservableCreate
  2. ObservableCreate调用map,返回ObservableMap
  3. ObservableMap调用subscribeOn,返回ObservableSubscribeOn
  4. 返回ObservableSubscribeOn调用observeOn,返回ObservableSubscribeOn
  5. ObservableSubscribeOn调用subscribe,执行最终的逻辑。
Observable
        .create(new ObservableOnSubscribe<String>() {

          @Override
          public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {
            Log.i("RxJavaTest", "subscribe");
            emitter.onNext("123");
          }
        }).map(new Function<String, String>() {

      @Override
      public String apply(String s) throws Throwable {
        return s + "456";
      }
    })
        .subscribeOn(Schedulers.io())
        .observeOn(Schedulers.io())
        .subscribe(new Observer<String>() {

          @Override
          public void onSubscribe(@NonNull Disposable d) {
            Log.i("RxJavaTest", "onSubscribe");
          }

          @Override
          public void onNext(@NonNull String s) {
            Log.i("RxJavaTest", "onNext: " + s);
          }

          @Override
          public void onError(@NonNull Throwable e) {
            Log.i("RxJavaTest", "onError");
          }

          @Override
          public void onComplete() {
            Log.i("RxJavaTest", "onComplete");
          }
        });

运行结果

RxJavaTest: onSubscribe
RxJavaTest: subscribe
RxJavaTest: onNext: 123456

原理图


上图是根据demo整理的关键逻辑的流程图。
主要内容有以下几点:

  • 每个Observable会对应一个Observer,Observer是Observable的内部类,比如MapObserver就是ObservableMap的内部类。
  • 当前的Observable会存储上一个Observable,即source对象。比如在Demo
    中ObservableMap的source就是ObservableCreate。
  • Observer会存储上一个Observer,即downstream。比如在Demo中,MapObserver的downstream就是CreateEmitter。
  • 从调用第一个subscribe方法起,会一层一层的去调用Observable的subscribe方法,最终调用到的是Observable的subscribeActual方法,这个每个子类的逻辑不一样。
    比如ObservableObserveOn的subscribeActual方法就会去获取到执行Observer的Worker然后传给source。
    再比如ObservableMap会将定义map方法的Function对象传给source。
  • 此处举例使用的是Observer.onNext方法,当然也可以是onError等其他方法。每个Observer会引用上一个Observer,即downstream。在Demo中最后一个Observer就是ObserveOnObserver。
    每个Observer对onNext等方法的处理逻辑也都是不一样的,还是举例来说明。
    比如MapObserver的onNext中,就是先将结果执行map中的逻辑,然后再传给downstream的onNext方法处理。
    比如ObserveOnObserver的onNext中,会将source的onNext方法抛到Worker中去处理,或者说抛到具体的某个线程中去处理。

关键源码解析

Observable.subscribe

在Observable的subscribe方法中,最终会调用到subscribeActual方法,subscribeActual方法是一个虚方法,会在子类中实现

    public final void subscribe(@NonNull Observer<? super T> observer) {
        Objects.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);

            Objects.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");

            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }
    protected abstract void subscribeActual(@NonNull Observer<? super T> observer);

Observable.subscribeOn

subscribeOn最终创建的是ObservableSubscribeOn对象。

    public final Observable<T> subscribeOn(@NonNull Scheduler scheduler) {
        Objects.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<>(this, scheduler));
    }

ObservableSubscribeOn.subscribeActual

subscribeOn这个方法是来决定订阅时执行的线程,因此在此处就需要抛到对应的线程去运行。

在subscribeActual中,最终会调用到Scheduler.scheduleDirect方法,在其中会将Observer中的逻辑放到新建的Worker中运行,即放到Scheduler设置的线程中运行。

    public void subscribeActual(final Observer<? super T> observer) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer);

        observer.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
    public Disposable scheduleDirect(@NonNull Runnable run) {
        return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
    }
    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        final Worker w = createWorker();

        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        DisposeTask task = new DisposeTask(decoratedRun, w);

        w.schedule(task, delay, unit);

        return task;
    }

ObservableSubscribeOn。SubscribeOnObserver.onNext

SubscribeOnObserver的onnext就只是调用了下上一个Observer的onNext方法。

此处有的读者可以会有点混乱,额外解释下。
Observable.subscribeOn:定义订阅时执行的线程
Observable.observeOn: 定义监听时执行的线程

        public void onNext(T t) {
            downstream.onNext(t);
        }

Observable.observeOn

observeOn方法最终返回的是ObservableObserveOn对象。

    public final Observable<T> observeOn(@NonNull Scheduler scheduler) {
        return observeOn(scheduler, false, bufferSize());
    }
    public final Observable<T> observeOn(@NonNull Scheduler scheduler, boolean delayError, int bufferSize) {
        Objects.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<>(this, scheduler, delayError, bufferSize));
    }

ObservableObserveOn.subscribeActual

此处会创建Worker,即定义Observer要运行的线程,然后创建Observer传给source.subscribe方法。

    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();

            source.subscribe(new ObserveOnObserver<>(observer, w, delayError, bufferSize));
        }
    }

ObservableObserveOn.ObserveOnObserver.onNext

observeOn这个方法是来决定监听时执行的线程,因此会在Observer的方法中,将逻辑抛到对应的线程中执行。

此处最终会调用到worker.schedule方法,将逻辑放到对应的线程中去执行。

        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != QueueDisposable.ASYNC) {
                queue.offer(t);
            }
            schedule();
        }
        void schedule() {
            if (getAndIncrement() == 0) {
                worker.schedule(this);
            }
        }

以上是关于Rxjava 源码解析 - 线程切换源码的主要内容,如果未能解决你的问题,请参考以下文章

Rxjava 源码解析 - 线程切换源码

Rxjava 源码解析 - Schedulers默认线程池

Rxjava 源码解析 - Schedulers默认线程池

Rxjava 源码解析 - Schedulers默认线程池

Rxjava 源码解析 - subscribe源码

Rxjava 源码解析 - subscribe源码