Rxjava 源码解析 - Schedulers默认线程池
Posted 许佳佳233
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Rxjava 源码解析 - Schedulers默认线程池相关的知识,希望对你有一定的参考价值。
Rxjava源码解析系列:
Rxjava 源码解析(一) - subscribe源码
Rxjava 源码解析(二) - 线程切换源码
Rxjava 源码解析(三) - Schedulers默认线程池
概述
前文已经分析过rxjava中我们常见用法的一些源码,还没有了解的读者推荐看下:
Rxjava 源码解析(一) - subscribe源码
Rxjava 源码解析(二) - 线程切换源码
本文将分析下rxjava线程池相关的源码。
本文的内容大概有以下:
- Schedulers.io()源码
- 线程工厂RxThreadFactory源码
- 线程池CachedWorkerPool源码
- Worker的创建与调用
Schedulers.io()源码
rxjava内置的线程池除了io,还有newThead,single等,它们的逻辑都是类似的,本文就以io为例子来进行解析。
demo代码:
Observable
.create(new ObservableOnSubscribe<String>()
@Override
public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable
Log.i("RxJavaTest", "subscribe");
emitter.onNext("123");
).map(new Function<String, String>()
@Override
public String apply(String s) throws Throwable
return s + "456";
)
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.subscribe(new Observer<String>()
@Override
public void onSubscribe(@NonNull Disposable d)
Log.i("RxJavaTest", "onSubscribe");
@Override
public void onNext(@NonNull String s)
Log.i("RxJavaTest", "onNext: " + s);
@Override
public void onError(@NonNull Throwable e)
Log.i("RxJavaTest", "onError");
@Override
public void onComplete()
Log.i("RxJavaTest", "onComplete");
);
Schedulers.io()
RxJavaPlugins.onioscheduler是rxjava设置的代理,默认情况下没有,因此直接看下Schedulers.IO逻辑即可。
public static Scheduler io()
return RxJavaPlugins.onIoScheduler(IO);
public static Scheduler onIoScheduler(@NonNull Scheduler defaultScheduler)
Function<? super Scheduler, ? extends Scheduler> f = onIoHandler;
if (f == null)
return defaultScheduler;
return apply(f, defaultScheduler);
Schedulers.IO
IO是Schedulers的静态对象,在这个类被调用的时候会初始化。
static final Scheduler IO;
static
SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());
COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());
IO = RxJavaPlugins.initIoScheduler(new IOTask());
TRAMPOLINE = TrampolineScheduler.instance();
NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
Schedulers.IOTask
IOTask中最终返回的是IoHolder.DEFAULT对象。
static final class IOTask implements Supplier<Scheduler>
@Override
public Scheduler get()
return IoHolder.DEFAULT;
Schedulers.IoHolder.DEFAULT
static final class IoHolder
static final Scheduler DEFAULT = new IoScheduler();
IoScheduler的构造
IoScheduler中的逻辑就比较关键了,主要有以下几点:
- IoScheduler存储了线程工厂RxThreadFactory和线程池CachedWorkerPool。
- 在IoScheduler构造函数中会去初始化这两个值
public IoScheduler()
this(WORKER_THREAD_FACTORY);
static final RxThreadFactory WORKER_THREAD_FACTORY;
static final CachedWorkerPool NONE;
final ThreadFactory threadFactory;
final AtomicReference<CachedWorkerPool> pool;
public IoScheduler(ThreadFactory threadFactory)
this.threadFactory = threadFactory;
this.pool = new AtomicReference<>(NONE);
start();
static
————————————————省略
WORKER_THREAD_FACTORY = new RxThreadFactory(WORKER_THREAD_NAME_PREFIX, priority);
NONE = new CachedWorkerPool(0, null, WORKER_THREAD_FACTORY);
NONE.shutdown();
————————————————省略
线程工厂RxThreadFactory源码
RxThreadFactory就是一个普通的线程工厂,主要逻辑如下:
- 会使用prefix字段来作唯一标识
- 优先级是调用的地方传进来的
public final class RxThreadFactory extends AtomicLong implements ThreadFactory
private static final long serialVersionUID = -7789753024099756196L;
final String prefix;
final int priority;
final boolean nonBlocking;
public RxThreadFactory(String prefix)
this(prefix, Thread.NORM_PRIORITY, false);
public RxThreadFactory(String prefix, int priority)
this(prefix, priority, false);
public RxThreadFactory(String prefix, int priority, boolean nonBlocking)
this.prefix = prefix;
this.priority = priority;
this.nonBlocking = nonBlocking;
@Override
public Thread newThread(@NonNull Runnable r)
StringBuilder nameBuilder = new StringBuilder(prefix).append('-').append(incrementAndGet());
String name = nameBuilder.toString();
Thread t = nonBlocking ? new RxCustomThread(r, name) : new Thread(r, name);
t.setPriority(priority);
t.setDaemon(true);
return t;
@Override
public String toString()
return "RxThreadFactory[" + prefix + "]";
static final class RxCustomThread extends Thread implements NonBlockingThread
RxCustomThread(Runnable run, String name)
super(run, name);
线程池CachedWorkerPool源码
CachedWorkerPool.get()
从CachedWorkerPool中获取Worker时,会先去expiringWorkerQueue中查看是否有缓存的Worker,如果没有的话就会传入ThreadFactory来创建一个新的Worker。
static final class CachedWorkerPool implements Runnable
private final long keepAliveTime;
private final ConcurrentLinkedQueue<ThreadWorker> expiringWorkerQueue;
final CompositeDisposable allWorkers;
private final ScheduledExecutorService evictorService;
private final Future<?> evictorTask;
private final ThreadFactory threadFactory;
————————————————————省略
ThreadWorker get()
if (allWorkers.isDisposed())
return SHUTDOWN_THREAD_WORKER;
while (!expiringWorkerQueue.isEmpty())
ThreadWorker threadWorker = expiringWorkerQueue.poll();
if (threadWorker != null)
return threadWorker;
// No cached worker found, so create a new one.
ThreadWorker w = new ThreadWorker(threadFactory);
allWorkers.add(w);
return w;
————————————————————省略
IoScheduler.ThreadWorker
主要逻辑如下:
- 在创建ThreadWorker的时候,在它的父类NewThreadWorker中会创建ScheduledThreadPoolExecutor的线程池
- 在调用schedule方法处理事件时,会调用这个线程池来处理。
对ioScheduler的两点总结:
- IoScheduler的线程池与Worker是一一对应关系
- IoScheduler的一个线程工厂会对应多个Worker和多个线程池
(线程池和Worker的对应关系每个Scheduler是不一样的,如SingleScheduler中就是一个线程池对应所有的Worker,当然开发者也可以完全自定义Scheduler的逻辑)
static final class ThreadWorker extends NewThreadWorker
long expirationTime;
ThreadWorker(ThreadFactory threadFactory)
super(threadFactory);
this.expirationTime = 0L;
public long getExpirationTime()
return expirationTime;
public void setExpirationTime(long expirationTime)
this.expirationTime = expirationTime;
public NewThreadWorker(ThreadFactory threadFactory)
executor = SchedulerPoolFactory.create(threadFactory);
public static ScheduledExecutorService create(ThreadFactory factory)
final ScheduledThreadPoolExecutor exec = new ScheduledThreadPoolExecutor(1, factory);
exec.setRemoveOnCancelPolicy(PURGE_ENABLED);
return exec;
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent)
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null)
if (!parent.add(sr))
return sr;
Future<?> f;
try
if (delayTime <= 0)
f = executor.submit((Callable<Object>)sr);
else
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
sr.setFuture(f);
catch (RejectedExecutionException ex)
if (parent != null)
parent.remove(sr);
RxJavaPlugins.onError(ex);
return sr;
Worker的创建与调用
Worker在rxjava中的使用到的地方非常多,可以说Worker就是rxjava实现线程切换的关键。
此处以ObservableObserveOn这个类为例来说下Worker的创建与使用。
如果不了解ObservableObserveOn,或者不了解rxjava常用方法的源码的读者,可以先看下前文:
Rxjava 源码解析(一) - subscribe源码
Rxjava 源码解析(二) - 线程切换源码
ObservableObserveOn中使用Worker的主要逻辑:
- 在调用执行subscribe时,会创建Woker并传入ObserveOnObserver
- 当执行onNext等回调时,会调用Worker的schedule方法。这样就可以实现线程切换的操作
ObservableObserveOn.subscribeActual
protected void subscribeActual(Observer<? super T> observer)
if (scheduler instanceof TrampolineScheduler)
source.subscribe(observer);
else
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<>(observer, w, delayError, bufferSize));
ObservableObserveOn.ObserveOnObserver.onNext
public void onNext(T t)
if (done)
return;
if (sourceMode != QueueDisposable.ASYNC)
queue.offer(t);
schedule();
ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize)
this.downstream = actual;
this.worker = worker;
this.delayError = delayError;
this.bufferSize = bufferSize;
void schedule()
if (getAndIncrement() == 0)
worker.schedule(this);
以上是关于Rxjava 源码解析 - Schedulers默认线程池的主要内容,如果未能解决你的问题,请参考以下文章
RxJava/RxAndroid 中的 Schedulers.computation() v/s Schedulers.io()