Rxjava 源码解析 - Schedulers默认线程池
Posted 许佳佳233
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Rxjava 源码解析 - Schedulers默认线程池相关的知识,希望对你有一定的参考价值。
Rxjava源码解析系列:
Rxjava 源码解析(一) - subscribe源码
Rxjava 源码解析(二) - 线程切换源码
Rxjava 源码解析(三) - Schedulers默认线程池
概述
前文已经分析过rxjava中我们常见用法的一些源码,还没有了解的读者推荐看下:
Rxjava 源码解析(一) - subscribe源码
Rxjava 源码解析(二) - 线程切换源码
本文将分析下rxjava线程池相关的源码。
本文的内容大概有以下:
- Schedulers.io()源码
- 线程工厂RxThreadFactory源码
- 线程池CachedWorkerPool源码
- Worker的创建与调用
Schedulers.io()源码
rxjava内置的线程池除了io,还有newThead,single等,它们的逻辑都是类似的,本文就以io为例子来进行解析。
demo代码:
Observable
.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {
Log.i("RxJavaTest", "subscribe");
emitter.onNext("123");
}
}).map(new Function<String, String>() {
@Override
public String apply(String s) throws Throwable {
return s + "456";
}
})
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.i("RxJavaTest", "onSubscribe");
}
@Override
public void onNext(@NonNull String s) {
Log.i("RxJavaTest", "onNext: " + s);
}
@Override
public void onError(@NonNull Throwable e) {
Log.i("RxJavaTest", "onError");
}
@Override
public void onComplete() {
Log.i("RxJavaTest", "onComplete");
}
});
Schedulers.io()
RxJavaPlugins.onioscheduler是rxjava设置的代理,默认情况下没有,因此直接看下Schedulers.IO逻辑即可。
public static Scheduler io() {
return RxJavaPlugins.onIoScheduler(IO);
}
public static Scheduler onIoScheduler(@NonNull Scheduler defaultScheduler) {
Function<? super Scheduler, ? extends Scheduler> f = onIoHandler;
if (f == null) {
return defaultScheduler;
}
return apply(f, defaultScheduler);
}
Schedulers.IO
IO是Schedulers的静态对象,在这个类被调用的时候会初始化。
static final Scheduler IO;
static {
SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());
COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());
IO = RxJavaPlugins.initIoScheduler(new IOTask());
TRAMPOLINE = TrampolineScheduler.instance();
NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
}
Schedulers.IOTask
IOTask中最终返回的是IoHolder.DEFAULT对象。
static final class IOTask implements Supplier<Scheduler> {
@Override
public Scheduler get() {
return IoHolder.DEFAULT;
}
}
Schedulers.IoHolder.DEFAULT
static final class IoHolder {
static final Scheduler DEFAULT = new IoScheduler();
}
IoScheduler的构造
IoScheduler中的逻辑就比较关键了,主要有以下几点:
- IoScheduler存储了线程工厂RxThreadFactory和线程池CachedWorkerPool。
- 在IoScheduler构造函数中会去初始化这两个值
public IoScheduler() {
this(WORKER_THREAD_FACTORY);
}
static final RxThreadFactory WORKER_THREAD_FACTORY;
static final CachedWorkerPool NONE;
final ThreadFactory threadFactory;
final AtomicReference<CachedWorkerPool> pool;
public IoScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
this.pool = new AtomicReference<>(NONE);
start();
}
static {
————————————————省略
WORKER_THREAD_FACTORY = new RxThreadFactory(WORKER_THREAD_NAME_PREFIX, priority);
NONE = new CachedWorkerPool(0, null, WORKER_THREAD_FACTORY);
NONE.shutdown();
————————————————省略
}
线程工厂RxThreadFactory源码
RxThreadFactory就是一个普通的线程工厂,主要逻辑如下:
- 会使用prefix字段来作唯一标识
- 优先级是调用的地方传进来的
public final class RxThreadFactory extends AtomicLong implements ThreadFactory {
private static final long serialVersionUID = -7789753024099756196L;
final String prefix;
final int priority;
final boolean nonBlocking;
public RxThreadFactory(String prefix) {
this(prefix, Thread.NORM_PRIORITY, false);
}
public RxThreadFactory(String prefix, int priority) {
this(prefix, priority, false);
}
public RxThreadFactory(String prefix, int priority, boolean nonBlocking) {
this.prefix = prefix;
this.priority = priority;
this.nonBlocking = nonBlocking;
}
@Override
public Thread newThread(@NonNull Runnable r) {
StringBuilder nameBuilder = new StringBuilder(prefix).append('-').append(incrementAndGet());
String name = nameBuilder.toString();
Thread t = nonBlocking ? new RxCustomThread(r, name) : new Thread(r, name);
t.setPriority(priority);
t.setDaemon(true);
return t;
}
@Override
public String toString() {
return "RxThreadFactory[" + prefix + "]";
}
static final class RxCustomThread extends Thread implements NonBlockingThread {
RxCustomThread(Runnable run, String name) {
super(run, name);
}
}
线程池CachedWorkerPool源码
CachedWorkerPool.get()
从CachedWorkerPool中获取Worker时,会先去expiringWorkerQueue中查看是否有缓存的Worker,如果没有的话就会传入ThreadFactory来创建一个新的Worker。
static final class CachedWorkerPool implements Runnable {
private final long keepAliveTime;
private final ConcurrentLinkedQueue<ThreadWorker> expiringWorkerQueue;
final CompositeDisposable allWorkers;
private final ScheduledExecutorService evictorService;
private final Future<?> evictorTask;
private final ThreadFactory threadFactory;
————————————————————省略
ThreadWorker get() {
if (allWorkers.isDisposed()) {
return SHUTDOWN_THREAD_WORKER;
}
while (!expiringWorkerQueue.isEmpty()) {
ThreadWorker threadWorker = expiringWorkerQueue.poll();
if (threadWorker != null) {
return threadWorker;
}
}
// No cached worker found, so create a new one.
ThreadWorker w = new ThreadWorker(threadFactory);
allWorkers.add(w);
return w;
}
————————————————————省略
}
IoScheduler.ThreadWorker
主要逻辑如下:
- 在创建ThreadWorker的时候,在它的父类NewThreadWorker中会创建ScheduledThreadPoolExecutor的线程池
- 在调用schedule方法处理事件时,会调用这个线程池来处理。
对ioScheduler的两点总结:
- IoScheduler的线程池与Worker是一一对应关系
- IoScheduler的一个线程工厂会对应多个Worker和多个线程池
(线程池和Worker的对应关系每个Scheduler是不一样的,如SingleScheduler中就是一个线程池对应所有的Worker,当然开发者也可以完全自定义Scheduler的逻辑)
static final class ThreadWorker extends NewThreadWorker {
long expirationTime;
ThreadWorker(ThreadFactory threadFactory) {
super(threadFactory);
this.expirationTime = 0L;
}
public long getExpirationTime() {
return expirationTime;
}
public void setExpirationTime(long expirationTime) {
this.expirationTime = expirationTime;
}
}
public NewThreadWorker(ThreadFactory threadFactory) {
executor = SchedulerPoolFactory.create(threadFactory);
}
public static ScheduledExecutorService create(ThreadFactory factory) {
final ScheduledThreadPoolExecutor exec = new ScheduledThreadPoolExecutor(1, factory);
exec.setRemoveOnCancelPolicy(PURGE_ENABLED);
return exec;
}
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try {
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
Worker的创建与调用
Worker在rxjava中的使用到的地方非常多,可以说Worker就是rxjava实现线程切换的关键。
此处以ObservableObserveOn这个类为例来说下Worker的创建与使用。
如果不了解ObservableObserveOn,或者不了解rxjava常用方法的源码的读者,可以先看下前文:
Rxjava 源码解析(一) - subscribe源码
Rxjava 源码解析(二) - 线程切换源码
ObservableObserveOn中使用Worker的主要逻辑:
- 在调用执行subscribe时,会创建Woker并传入ObserveOnObserver
- 当执行onNext等回调时,会调用Worker的schedule方法。这样就可以实现线程切换的操作
ObservableObserveOn.subscribeActual
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<>(observer, w, delayError, bufferSize));
}
}
ObservableObserveOn.ObserveOnObserver.onNext
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
this.downstream = actual;
this.worker = worker;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
以上是关于Rxjava 源码解析 - Schedulers默认线程池的主要内容,如果未能解决你的问题,请参考以下文章
RxJava/RxAndroid 中的 Schedulers.computation() v/s Schedulers.io()