RxJava 线程模型分析

Posted Java与Android技术栈

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RxJava 线程模型分析相关的知识,希望对你有一定的参考价值。

RxJava的被观察者在使用操作符时可以利用线程调度器--Scheduler来切换线程,例如

 
   
   
 
  1.        Observable.just("aaa","bbb")

  2.                .observeOn(Schedulers.newThread())

  3.                .map(new Function<String, String>() {

  4.                    @Override

  5.                    public String apply(@NonNull String s) throws Exception {

  6.                        return s.toUpperCase();

  7.                    }

  8.                })

  9.                .subscribeOn(Schedulers.single())

  10.                .observeOn(Schedulers.io())

  11.                .subscribe(new Consumer<String>() {

  12.                    @Override

  13.                    public void accept(@NonNull String s) throws Exception {

  14.                        System.out.println(s);

  15.                    }

  16.                });

被观察者(Observable、Flowable...)发射数据流之后,其操作符可以在不同的线程中加工数据流,最后被观察者在前台线程中接受并响应数据。

下图不同的箭头颜色表示不同的线程。


一. 线程调度器

Schedulers 是一个静态工厂类,通过分析Schedulers的源码可以看到它有多种不同类型的Scheduler。下面是Schedulers的各个工厂方法。

computation()用于CPU密集型的计算任务,但并不适合于IO操作。

 
   
   
 
  1.    @NonNull

  2.    public static Scheduler computation() {

  3.        return RxJavaPlugins.onComputationScheduler(COMPUTATION);

  4.    }

io()用于IO密集型任务,支持异步阻塞IO操作,这个调度器的线程池会根据需要增长。对于普通的计算任务,请使用Schedulers.computation()。

 
   
   
 
  1.    @NonNull

  2.    public static Scheduler io() {

  3.        return RxJavaPlugins.onioscheduler(IO);

  4.    }

trampoline()在RxJava2中跟RxJava1的作用是不同的。在RxJava2中表示立即执行,如果当前线程有任务在执行,则会将其暂停,等插入进来的新任务执行完之后,再将原先未完成的任务接着执行。在RxJava1中表示在当前线程中等待其他任务完成之后,再执行新的任务。

 
   
   
 
  1.    @NonNull

  2.    public static Scheduler trampoline() {

  3.        return TRAMPOLINE;

  4.    }

newThread()为每个任务创建一个新线程。

 
   
   
 
  1.    @NonNull

  2.    public static Scheduler newThread() {

  3.        return RxJavaPlugins.onNewThreadScheduler(NEW_THREAD);

  4.    }

single()拥有一个线程单例,所有的任务都在这一个线程中执行,当此线程中有任务执行时,它的任务们将会按照先进先出的顺序依次执行。

 
   
   
 
  1.    @NonNull

  2.    public static Scheduler single() {

  3.        return RxJavaPlugins.onSingleScheduler(SINGLE);

  4.    }

除此之外,还支持自定义的Executor来作为调度器。

 
   
   
 
  1.    @NonNull

  2.    public static Scheduler from(@NonNull Executor executor) {

  3.        return new ExecutorScheduler(executor);

  4.    }


RxJava 线程模型分析


Scheduler是RxJava的线程任务调度器,Worker是线程任务的具体执行者。从Scheduler源码可以看到,Scheduler在scheduleDirect()、schedulePeriodicallyDirect()方法中创建了Worker,然后会分别调用worker的schedule()、schedulePeriodically()来执行任务。

 
   
   
 
  1.    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {

  2.        final Worker w = createWorker();

  3.        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

  4.        DisposeTask task = new DisposeTask(decoratedRun, w);

  5.        w.schedule(task, delay, unit);

  6.        return task;

  7.    }

  8.    public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initialDelay, long period, @NonNull TimeUnit unit) {

  9.        final Worker w = createWorker();

  10.        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

  11.        PeriodicDirectTask periodicTask = new PeriodicDirectTask(decoratedRun, w);

  12.        Disposable d = w.schedulePeriodically(periodicTask, initialDelay, period, unit);

  13.        if (d == EmptyDisposable.INSTANCE) {

  14.            return d;

  15.        }

  16.        return periodicTask;

  17.    }

Worker也是一个抽象类,从上图可以看到每一种Scheduler会对应一种具体的Worker。

 
   
   
 
  1.    public abstract static class Worker implements Disposable {

  2.        public Disposable schedule(@NonNull Runnable run) {

  3.            return schedule(run, 0L, TimeUnit.NANOSECONDS);

  4.        }

  5.        public abstract Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit);

  6.        public Disposable schedulePeriodically(@NonNull Runnable run, final long initialDelay, final long period, @NonNull final TimeUnit unit) {

  7.            final SequentialDisposable first = new SequentialDisposable();

  8.            final SequentialDisposable sd = new SequentialDisposable(first);

  9.            final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

  10.            final long periodInNanoseconds = unit.toNanos(period);

  11.            final long firstNowNanoseconds = now(TimeUnit.NANOSECONDS);

  12.            final long firstStartInNanoseconds = firstNowNanoseconds + unit.toNanos(initialDelay);

  13.            Disposable d = schedule(new PeriodicTask(firstStartInNanoseconds, decoratedRun, firstNowNanoseconds, sd,

  14.                    periodInNanoseconds), initialDelay, unit);

  15.            if (d == EmptyDisposable.INSTANCE) {

  16.                return d;

  17.            }

  18.            first.replace(d);

  19.            return sd;

  20.        }

  21.        public long now(@NonNull TimeUnit unit) {

  22.            return unit.convert(System.currentTimeMillis(), TimeUnit.MILLISECONDS);

  23.        }

  24.        ...

  25.    }

1.1 SingleScheduler

SingleScheduler是RxJava2新增的Scheduler。SingleScheduler中有一个属性叫executor,它是使用AtomicReference包装的ScheduledExecutorService。

 
   
   
 
  1. final AtomicReference<ScheduledExecutorService> executor = new AtomicReference<ScheduledExecutorService>();

在SingleScheduler构造函数中,executor会调用lazySet()。

 
   
   
 
  1.    public SingleScheduler(ThreadFactory threadFactory) {

  2.        this.threadFactory = threadFactory;

  3.        executor.lazySet(createExecutor(threadFactory));

  4.    }

它的createExecutor()用于创建工作线程,可以看到通过SchedulerPoolFactory来创建ScheduledExecutorService。

 
   
   
 
  1.    static ScheduledExecutorService createExecutor(ThreadFactory threadFactory) {

  2.        return SchedulerPoolFactory.create(threadFactory);

  3.    }

在SchedulerPoolFactory类的create(ThreadFactory factory) 中,使用newScheduledThreadPool线程池定义定时器,最大允许线程数为1。

 
   
   
 
  1.    public static ScheduledExecutorService create(ThreadFactory factory) {

  2.        final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);

  3.        if (exec instanceof ScheduledThreadPoolExecutor) {

  4.            ScheduledThreadPoolExecutor e = (ScheduledThreadPoolExecutor) exec;

  5.            POOLS.put(e, exec);

  6.        }

  7.        return exec;

  8.    }

在SingleScheduler中每次使用ScheduledExecutorService,其实是使用executor.get()。所以说,single拥有一个线程单例。

SingleScheduler会创建一个ScheduledWorker,ScheduledWorker使用jdk的ScheduledExecutorService作为executor。

下面是ScheduledWorker的schedule()方法。使用ScheduledExecutorService的submit()或schedule()来执行runnable。

 
   
   
 
  1.        @NonNull

  2.        @Override

  3.        public Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {

  4.            if (disposed) {

  5.                return EmptyDisposable.INSTANCE;

  6.            }

  7.            Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

  8.            ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, tasks);

  9.            tasks.add(sr);

  10.            try {

  11.                Future<?> f;

  12.                if (delay <= 0L) {

  13.                    f = executor.submit((Callable<Object>)sr);

  14.                } else {

  15.                    f = executor.schedule((Callable<Object>)sr, delay, unit);

  16.                }

  17.                sr.setFuture(f);

  18.            } catch (RejectedExecutionException ex) {

  19.                dispose();

  20.                RxJavaPlugins.onError(ex);

  21.                return EmptyDisposable.INSTANCE;

  22.            }

  23.            return sr;

  24.        }

1.2 ComputationScheduler

ComputationScheduler使用FixedSchedulerPool作为线程池,并且FixedSchedulerPool被AtomicReference包装了一下。

从ComputationScheduler的源码中可以看出,MAXTHREADS是CPU的数目。FixedSchedulerPool可以理解为拥有固定数量的线程池,数量为MAXTHREADS。

 
   
   
 
  1. static {

  2.     MAX_THREADS = cap(Runtime.getRuntime().availableProcessors(), Integer.getInteger(KEY_MAX_THREADS, 0));

  3.     ......

  4. }

  5. static int cap(int cpuCount, int paramThreads) {

  6.     return paramThreads <= 0 || paramThreads > cpuCount ? cpuCount : paramThreads;

  7. }

ComputationScheduler会创建一个EventLoopWorker。

 
   
   
 
  1.    @NonNull

  2.    @Override

  3.    public Worker createWorker() {

  4.        return new EventLoopWorker(pool.get().getEventLoop());

  5.    }

其中,getEventLoop()是FixedSchedulerPool中的方法,返回了FixedSchedulerPool中的一个PoolWorker。

 
   
   
 
  1.        public PoolWorker getEventLoop() {

  2.            int c = cores;

  3.            if (c == 0) {

  4.                return SHUTDOWN_WORKER;

  5.            }

  6.            // simple round robin, improvements to come

  7.            return eventLoops[(int)(n++ % c)];

  8.        }

PoolWorker继承自NewThreadWorker,它也是线程数为1的ScheduledExecutorService。

1.3 IoScheduler

IoScheduler使用CachedWorkerPool作为线程池,并且CachedWorkerPool也是被AtomicReference包装了一下。

CachedWorkerPool是基于RxThreadFactory这个ThreadFactory来创建的。

 
   
   
 
  1. static {

  2.        ......

  3.        WORKER_THREAD_FACTORY = new RxThreadFactory(WORKER_THREAD_NAME_PREFIX, priority);

  4.        ......

  5.        NONE = new CachedWorkerPool(0, null, WORKER_THREAD_FACTORY);

  6.       ......

  7. }

在RxThreadFactory中,由 prefix 和 incrementAndGet() 来创建新线程的名称。

 
   
   
 
  1.    @Override

  2.    public Thread newThread(Runnable r) {

  3.        StringBuilder nameBuilder = new StringBuilder(prefix).append('-').append(incrementAndGet());

  4.        String name = nameBuilder.toString();

  5.        Thread t = nonBlocking ? new RxCustomThread(r, name) : new Thread(r, name);

  6.        t.setPriority(priority);

  7.        t.setDaemon(true);

  8.        return t;

  9.    }

IoScheduler创建的线程数是不固定的,可以通过IoScheduler 的 size() 来获得当前的线程数。而ComputationScheduler的线程数一般情况等于CPU的数目。

 
   
   
 
  1.    public int size() {

  2.        return pool.get().allWorkers.size();

  3.    }

特别需要的是 ComputationScheduler 和 IoScheduler 都是依赖线程池来维护线程的,区别就是 IoScheduler 线程池中的个数是无限的,由 prefix 和 incrementAndGet() 产生的递增值来决定线程的名字;而 ComputationScheduler 中则是一个固定线程数量的线程池,数据为CPU的数目,并且不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。

同样,IoScheduler也会创建EventLoopWorker。

 
   
   
 
  1.    @NonNull

  2.    @Override

  3.    public Worker createWorker() {

  4.        return new EventLoopWorker(pool.get());

  5.    }

但是这个EventLoopWorker是IoScheduler的内部类,跟ComputationScheduler创建的EventLoopWorker是不一样的,只是二者的名称相同罢了。

1.4 NewThreadScheduler

NewThreadScheduler会创建NewThreadWorker。我们看到NewThreadWorker的构造函数也是使用SchedulerPoolFactory。

 
   
   
 
  1.    public NewThreadWorker(ThreadFactory threadFactory) {

  2.        executor = SchedulerPoolFactory.create(threadFactory);

  3.    }

跟SingleScheduler不同的是,SingleScheduler的executor是使用AtomicReference包装的ScheduledExecutorService。每次使用时,会调用executor.get()。

然而,NewThreadScheduler每次都会创建一个新的线程。

1.5 TrampolineScheduler

TrampolineScheduler会创建TrampolineWorker,在TrampolineWorker内部维护着一个PriorityBlockingQueue。任务进入该队列之前,会先用TimedRunnable封装一下。

 
   
   
 
  1.    static final class TimedRunnable implements Comparable<TimedRunnable> {

  2.        final Runnable run;

  3.        final long execTime;

  4.        final int count; // In case if time between enqueueing took less than 1ms

  5.        volatile boolean disposed;

  6.        TimedRunnable(Runnable run, Long execTime, int count) {

  7.            this.run = run;

  8.            this.execTime = execTime;

  9.            this.count = count;

  10.        }

  11.        @Override

  12.        public int compareTo(TimedRunnable that) {

  13.            int result = ObjectHelper.compare(execTime, that.execTime);

  14.            if (result == 0) {

  15.                return ObjectHelper.compare(count, that.count);

  16.            }

  17.            return result;

  18.        }

  19.    }

我们可以看到TimedRunnable实现了Comparable接口,会比较任务的execTime和count。

任务在进入queue之前,count每次都会+1。

 
   
   
 
  1. final TimedRunnable timedRunnable = new TimedRunnable(action, execTime, counter.incrementAndGet());

  2. queue.add(timedRunnable);

所以,使用TrampolineScheduler时,每次新的任务都会优先执行。

二. 线程调度

在默认情况下不做任何线程处理,Observable和Observer是处于同一线程中的。如果想要切换线程的话,可以使用subscribeOn()和observeOn()。

2.1 线程调度subscribeOn

subscribeOn通过接收一个Scheduler参数,来指定对数据的处理运行在特定的线程调度器Scheduler上。

若多次执行subscribeOn,则只有一次起作用。

点击subscribeOn()的源码可以看到,每次调用subscribeOn()都会创建一个ObservableSubscribeOn对象。

 
   
   
 
  1.    public final Observable<T> subscribeOn(Scheduler scheduler) {

  2.        ObjectHelper.requireNonNull(scheduler, "scheduler is null");

  3.        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));

  4.    }

ObservableSubscribeOn真正发生订阅的方法是subscribeActual(Observer observer)。

 
   
   
 
  1.    @Override

  2.    public void subscribeActual(final Observer<? super T> s) {

  3.        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

  4.        s.onSubscribe(parent);

  5.        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));

  6.    }

其中,SubscribeOnObserver是下游的Observer通过装饰器模式生成的。它实现了Observer、Disposable接口。

接下来,在上游的线程中执行下游Observer的onSubscribe(Disposable disposabel)方法。

 
   
   
 
  1. s.onSubscribe(parent);

然后,将子线程的操作加入Disposable管理中,加入Disposable后可以方便上下游的统一管理。

 
   
   
 
  1. parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));

在这里,已经调用对应scheduler的scheduleDirect()方法。scheduleDirect() 传入的是一个Runnable,也就是下面的SubscribeTask。

 
   
   
 
  1.    final class SubscribeTask implements Runnable {

  2.        private final SubscribeOnObserver<T> parent;

  3.        SubscribeTask(SubscribeOnObserver<T> parent) {

  4.            this.parent = parent;

  5.        }

  6.        @Override

  7.        public void run() {

  8.            source.subscribe(parent);

  9.        }

  10.    }

SubscribeTask会执行run()对上游的Observable进行订阅。

此时,已经在对应的Scheduler线程中运行了。

 
   
   
 
  1. source.subscribe(parent);

在RxJava的链式操作中,数据的处理是自下而上,这点跟数据发射正好相反。如果多次调用subscribeOn,最上面的线程切换最晚执行,所以变成了只有第一次切换线程才有效。

2.2 线程调度observeOn

observeOn同样接收一个Scheduler参数,用来指定下游操作运行在特定的线程调度器Scheduler上。

若多次执行observeOn,则每次均起作用,线程会一直切换。

点击observeOn()的源码可以看到,每次调用observeOn()都会创建一个ObservableObserveOn对象。

 
   
   
 
  1.    public final Observable<T> observeOn(Scheduler scheduler) {

  2.        return observeOn(scheduler, false, bufferSize());

  3.    }

  4.    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {

  5.        ObjectHelper.requireNonNull(scheduler, "scheduler is null");

  6.        ObjectHelper.verifyPositive(bufferSize, "bufferSize");

  7.        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));

  8.    }

ObservableObserveOn真正发生订阅的方法是subscribeActual(Observer observer)。

 
   
   
 
  1.    @Override

  2.    protected void subscribeActual(Observer<? super T> observer) {

  3.        if (scheduler instanceof TrampolineScheduler) {

  4.            source.subscribe(observer);

  5.        } else {

  6.            Scheduler.Worker w = scheduler.createWorker();

  7.            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));

  8.        }

  9.    }

如果scheduler是TrampolineScheduler,上游事件和下游事件会立即产生订阅。

如果不是的话,scheduler会创建自己的Worker,然后上游事件和下游事件产生订阅,生成一个ObserveOnObserver对象包装了下游真正的Observer。

ObserveOnObserver是ObservableObserveOn的内部类,实现了Observer、Runnable接口。跟SubscribeOnObserver不同的是,SubscribeOnObserver实现了Observer、Disposable接口。

在ObserveOnObserver的onNext()中,schedule()执行了具体调度的方法。

 
   
   
 
  1.        @Override

  2.        public void onNext(T t) {

  3.            if (done) {

  4.                return;

  5.            }

  6.            if (sourceMode != QueueDisposable.ASYNC) {

  7.                queue.offer(t);

  8.            }

  9.            schedule();

  10.        }

  11.        void schedule() {

  12.            if (getAndIncrement() == 0) {

  13.                worker.schedule(this);

  14.            }

  15.        }

其中,worker是当前scheduler创建的Worker,this指的是当前的ObserveOnObserver对象,this实现了Runnable接口。

然后,我们看看Runnable接口的实现方法run(),这个方法是在worker对应的线程里执行的。drainNormal()会取出 ObserveOnObserver 的 queue 里的数据进行发送。

 
   
   
 
  1.        @Override

  2.        public void run() {

  3.            if (outputFused) {

  4.                drainFused();

  5.            } else {

  6.                drainNormal();

  7.            }

  8.        }

下游多次调用observeOn()的话,线程会一直切换。每一次切换线程,都会把对应的Observer对象的各个方法的处理执行在指定的线程中。

三. 示例

举一个多次调用subscribeOn、observeOn的例子。

 
   
   
 
  1.        Observable.just("HELLO WORLD")

  2.                .subscribeOn(Schedulers.single())

  3.                .map(new Function<String, String>() {

  4.                    @Override

  5.                    public String apply(@NonNull String s) throws Exception {

  6.                        s = s.toLowerCase();

  7.                        L.i("map1",s);

  8.                        return s;

  9.                    }

  10.                })

  11.                .observeOn(Schedulers.io())

  12.                .map(new Function<String, String>() {

  13.                    @Override

  14.                    public String apply(String s) throws Exception {

  15.                        s = s+" tony.";

  16.                        L.i("map2",s);

  17.                        return s;

  18.                    }

  19.                })

  20.                .subscribeOn(Schedulers.computation())

  21.                .map(new Function<String, String>() {

  22.                    @Override

  23.                    public String apply(String s) throws Exception {

  24.                        s = s+"it is a test.";

  25.                        L.i("map3",s);

  26.                        return s;

  27.                    }

  28.                })

  29.                .observeOn(Schedulers.newThread())

  30.                .subscribe(new Consumer<String>() {

  31.                    @Override

  32.                    public void accept(@NonNull String s) throws Exception {

  33.                        L.i("subscribe",s);

  34.                        System.out.println(s);

  35.                    }

  36.                });



四. 总结

了解RxJava的线程模型、线程调度器、线程调度是非常有意义的。能够帮助我们更合理地使用RxJava。另外,RxJava的线程切换结合链式调用非常方便,比起Java使用线程操作实在是简单太多了。


关注【Java与Android技术栈】

更多精彩内容请关注



以上是关于RxJava 线程模型分析的主要内容,如果未能解决你的问题,请参考以下文章

理解RxJava线程模型

理解 RxJava 的线程模型(上)

理解 RxJava 的线程模型

RxJava中的doOnSubscribe默认运行线程分析

Rxjava 源码解析 - Schedulers默认线程池

Rxjava 源码解析 - Schedulers默认线程池