Rxjava 源码解析 - Schedulers默认线程池

Posted 许佳佳233

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Rxjava 源码解析 - Schedulers默认线程池相关的知识,希望对你有一定的参考价值。

Rxjava源码解析系列:
Rxjava 源码解析(一) - subscribe源码
Rxjava 源码解析(二) - 线程切换源码
Rxjava 源码解析(三) - Schedulers默认线程池

概述

前文已经分析过rxjava中我们常见用法的一些源码,还没有了解的读者推荐看下:
Rxjava 源码解析(一) - subscribe源码
Rxjava 源码解析(二) - 线程切换源码

本文将分析下rxjava线程池相关的源码。
本文的内容大概有以下:

  • Schedulers.io()源码
  • 线程工厂RxThreadFactory源码
  • 线程池CachedWorkerPool源码
  • Worker的创建与调用

Schedulers.io()源码

rxjava内置的线程池除了io,还有newThead,single等,它们的逻辑都是类似的,本文就以io为例子来进行解析。

demo代码:

    Observable
        .create(new ObservableOnSubscribe<String>() {

          @Override
          public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {
            Log.i("RxJavaTest", "subscribe");
            emitter.onNext("123");
          }
        }).map(new Function<String, String>() {

      @Override
      public String apply(String s) throws Throwable {
        return s + "456";
      }
    })
        .subscribeOn(Schedulers.io())
        .observeOn(Schedulers.io())
        .subscribe(new Observer<String>() {

          @Override
          public void onSubscribe(@NonNull Disposable d) {
            Log.i("RxJavaTest", "onSubscribe");
          }

          @Override
          public void onNext(@NonNull String s) {
            Log.i("RxJavaTest", "onNext: " + s);
          }

          @Override
          public void onError(@NonNull Throwable e) {
            Log.i("RxJavaTest", "onError");
          }

          @Override
          public void onComplete() {
            Log.i("RxJavaTest", "onComplete");
          }
        });

Schedulers.io()

RxJavaPlugins.onioscheduler是rxjava设置的代理,默认情况下没有,因此直接看下Schedulers.IO逻辑即可。

    public static Scheduler io() {
        return RxJavaPlugins.onIoScheduler(IO);
    }
    public static Scheduler onIoScheduler(@NonNull Scheduler defaultScheduler) {
        Function<? super Scheduler, ? extends Scheduler> f = onIoHandler;
        if (f == null) {
            return defaultScheduler;
        }
        return apply(f, defaultScheduler);
    }

Schedulers.IO

IO是Schedulers的静态对象,在这个类被调用的时候会初始化。

    static final Scheduler IO;
    static {
        SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());

        COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());

        IO = RxJavaPlugins.initIoScheduler(new IOTask());

        TRAMPOLINE = TrampolineScheduler.instance();

        NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
    }

Schedulers.IOTask

IOTask中最终返回的是IoHolder.DEFAULT对象。

    static final class IOTask implements Supplier<Scheduler> {
        @Override
        public Scheduler get() {
            return IoHolder.DEFAULT;
        }
    }

Schedulers.IoHolder.DEFAULT

    static final class IoHolder {
        static final Scheduler DEFAULT = new IoScheduler();
    }

IoScheduler的构造

IoScheduler中的逻辑就比较关键了,主要有以下几点:

  • IoScheduler存储了线程工厂RxThreadFactory和线程池CachedWorkerPool。
  • 在IoScheduler构造函数中会去初始化这两个值
    public IoScheduler() {
        this(WORKER_THREAD_FACTORY);
    }
    static final RxThreadFactory WORKER_THREAD_FACTORY;
    static final CachedWorkerPool NONE;
    final ThreadFactory threadFactory;
    final AtomicReference<CachedWorkerPool> pool;
    public IoScheduler(ThreadFactory threadFactory) {
        this.threadFactory = threadFactory;
        this.pool = new AtomicReference<>(NONE);
        start();
    }
    static {
————————————————省略
        WORKER_THREAD_FACTORY = new RxThreadFactory(WORKER_THREAD_NAME_PREFIX, priority);

        NONE = new CachedWorkerPool(0, null, WORKER_THREAD_FACTORY);
        NONE.shutdown();
————————————————省略
    }

线程工厂RxThreadFactory源码

RxThreadFactory就是一个普通的线程工厂,主要逻辑如下:

  • 会使用prefix字段来作唯一标识
  • 优先级是调用的地方传进来的
public final class RxThreadFactory extends AtomicLong implements ThreadFactory {

    private static final long serialVersionUID = -7789753024099756196L;

    final String prefix;

    final int priority;

    final boolean nonBlocking;

    public RxThreadFactory(String prefix) {
        this(prefix, Thread.NORM_PRIORITY, false);
    }

    public RxThreadFactory(String prefix, int priority) {
        this(prefix, priority, false);
    }

    public RxThreadFactory(String prefix, int priority, boolean nonBlocking) {
        this.prefix = prefix;
        this.priority = priority;
        this.nonBlocking = nonBlocking;
    }

    @Override
    public Thread newThread(@NonNull Runnable r) {
        StringBuilder nameBuilder = new StringBuilder(prefix).append('-').append(incrementAndGet());

        String name = nameBuilder.toString();
        Thread t = nonBlocking ? new RxCustomThread(r, name) : new Thread(r, name);
        t.setPriority(priority);
        t.setDaemon(true);
        return t;
    }

    @Override
    public String toString() {
        return "RxThreadFactory[" + prefix + "]";
    }

    static final class RxCustomThread extends Thread implements NonBlockingThread {
        RxCustomThread(Runnable run, String name) {
            super(run, name);
        }
    }

线程池CachedWorkerPool源码

CachedWorkerPool.get()

从CachedWorkerPool中获取Worker时,会先去expiringWorkerQueue中查看是否有缓存的Worker,如果没有的话就会传入ThreadFactory来创建一个新的Worker。

    static final class CachedWorkerPool implements Runnable {
        private final long keepAliveTime;
        private final ConcurrentLinkedQueue<ThreadWorker> expiringWorkerQueue;
        final CompositeDisposable allWorkers;
        private final ScheduledExecutorService evictorService;
        private final Future<?> evictorTask;
        private final ThreadFactory threadFactory;
        
————————————————————省略

        ThreadWorker get() {
            if (allWorkers.isDisposed()) {
                return SHUTDOWN_THREAD_WORKER;
            }
            while (!expiringWorkerQueue.isEmpty()) {
                ThreadWorker threadWorker = expiringWorkerQueue.poll();
                if (threadWorker != null) {
                    return threadWorker;
                }
            }

            // No cached worker found, so create a new one.
            ThreadWorker w = new ThreadWorker(threadFactory);
            allWorkers.add(w);
            return w;
        }

————————————————————省略
    }

IoScheduler.ThreadWorker

主要逻辑如下:

  • 在创建ThreadWorker的时候,在它的父类NewThreadWorker中会创建ScheduledThreadPoolExecutor的线程池
  • 在调用schedule方法处理事件时,会调用这个线程池来处理。

对ioScheduler的两点总结:

  • IoScheduler的线程池与Worker是一一对应关系
  • IoScheduler的一个线程工厂会对应多个Worker和多个线程池

(线程池和Worker的对应关系每个Scheduler是不一样的,如SingleScheduler中就是一个线程池对应所有的Worker,当然开发者也可以完全自定义Scheduler的逻辑)

    static final class ThreadWorker extends NewThreadWorker {

        long expirationTime;

        ThreadWorker(ThreadFactory threadFactory) {
            super(threadFactory);
            this.expirationTime = 0L;
        }

        public long getExpirationTime() {
            return expirationTime;
        }

        public void setExpirationTime(long expirationTime) {
            this.expirationTime = expirationTime;
        }
    }
    public NewThreadWorker(ThreadFactory threadFactory) {
        executor = SchedulerPoolFactory.create(threadFactory);
    }
    public static ScheduledExecutorService create(ThreadFactory factory) {
        final ScheduledThreadPoolExecutor exec = new ScheduledThreadPoolExecutor(1, factory);
        exec.setRemoveOnCancelPolicy(PURGE_ENABLED);
        return exec;
    }
    public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
        Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

        if (parent != null) {
            if (!parent.add(sr)) {
                return sr;
            }
        }

        Future<?> f;
        try {
            if (delayTime <= 0) {
                f = executor.submit((Callable<Object>)sr);
            } else {
                f = executor.schedule((Callable<Object>)sr, delayTime, unit);
            }
            sr.setFuture(f);
        } catch (RejectedExecutionException ex) {
            if (parent != null) {
                parent.remove(sr);
            }
            RxJavaPlugins.onError(ex);
        }

        return sr;
    }

Worker的创建与调用

Worker在rxjava中的使用到的地方非常多,可以说Worker就是rxjava实现线程切换的关键。
此处以ObservableObserveOn这个类为例来说下Worker的创建与使用。

如果不了解ObservableObserveOn,或者不了解rxjava常用方法的源码的读者,可以先看下前文:
Rxjava 源码解析(一) - subscribe源码
Rxjava 源码解析(二) - 线程切换源码

ObservableObserveOn中使用Worker的主要逻辑:

  • 在调用执行subscribe时,会创建Woker并传入ObserveOnObserver
  • 当执行onNext等回调时,会调用Worker的schedule方法。这样就可以实现线程切换的操作

ObservableObserveOn.subscribeActual

    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();

            source.subscribe(new ObserveOnObserver<>(observer, w, delayError, bufferSize));
        }
    }

ObservableObserveOn.ObserveOnObserver.onNext

        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != QueueDisposable.ASYNC) {
                queue.offer(t);
            }
            schedule();
        }
        ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
            this.downstream = actual;
            this.worker = worker;
            this.delayError = delayError;
            this.bufferSize = bufferSize;
        }
        void schedule() {
            if (getAndIncrement() == 0) {
                worker.schedule(this);
            }
        }

以上是关于Rxjava 源码解析 - Schedulers默认线程池的主要内容,如果未能解决你的问题,请参考以下文章

Rxjava 源码解析 - Schedulers默认线程池

Rxjava 源码解析 - 线程切换源码

Rxjava 源码解析 - 线程切换源码

Rxjava 源码解析 - subscribe源码

Rxjava 源码解析 - subscribe源码

RxJava/RxAndroid 中的 Schedulers.computation() v/s Schedulers.io()