Rxjava 源码解析 - 线程切换源码
Posted 许佳佳233
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Rxjava 源码解析 - 线程切换源码相关的知识,希望对你有一定的参考价值。
Rxjava源码解析系列:
Rxjava 源码解析(一) - subscribe源码
Rxjava 源码解析(二) - 线程切换源码
Rxjava 源码解析(三) - Schedulers默认线程池
概述
前文已经讲了rxjava 简单subscribe的源码,有兴趣的读者可以看下。
Rxjava 源码解析(一) - subscribe源码
本文将基于前文的分析,继续讲下rxjava中断线程切换。
Demo
主要逻辑如下:
- Observable调用create()创建 ObservableCreate
- ObservableCreate调用map,返回ObservableMap
- ObservableMap调用subscribeOn,返回ObservableSubscribeOn
- 返回ObservableSubscribeOn调用observeOn,返回ObservableSubscribeOn
- ObservableSubscribeOn调用subscribe,执行最终的逻辑。
Observable
.create(new ObservableOnSubscribe<String>()
@Override
public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable
Log.i("RxJavaTest", "subscribe");
emitter.onNext("123");
).map(new Function<String, String>()
@Override
public String apply(String s) throws Throwable
return s + "456";
)
.subscribeOn(Schedulers.io())
.observeOn(Schedulers.io())
.subscribe(new Observer<String>()
@Override
public void onSubscribe(@NonNull Disposable d)
Log.i("RxJavaTest", "onSubscribe");
@Override
public void onNext(@NonNull String s)
Log.i("RxJavaTest", "onNext: " + s);
@Override
public void onError(@NonNull Throwable e)
Log.i("RxJavaTest", "onError");
@Override
public void onComplete()
Log.i("RxJavaTest", "onComplete");
);
运行结果
RxJavaTest: onSubscribe
RxJavaTest: subscribe
RxJavaTest: onNext: 123456
原理图
上图是根据demo整理的关键逻辑的流程图。
主要内容有以下几点:
- 每个Observable会对应一个Observer,Observer是Observable的内部类,比如MapObserver就是ObservableMap的内部类。
- 当前的Observable会存储上一个Observable,即source对象。比如在Demo
中ObservableMap的source就是ObservableCreate。 - Observer会存储上一个Observer,即downstream。比如在Demo中,MapObserver的downstream就是CreateEmitter。
- 从调用第一个subscribe方法起,会一层一层的去调用Observable的subscribe方法,最终调用到的是Observable的subscribeActual方法,这个每个子类的逻辑不一样。
比如ObservableObserveOn的subscribeActual方法就会去获取到执行Observer的Worker然后传给source。
再比如ObservableMap会将定义map方法的Function对象传给source。 - 此处举例使用的是Observer.onNext方法,当然也可以是onError等其他方法。每个Observer会引用上一个Observer,即downstream。在Demo中最后一个Observer就是ObserveOnObserver。
每个Observer对onNext等方法的处理逻辑也都是不一样的,还是举例来说明。
比如MapObserver的onNext中,就是先将结果执行map中的逻辑,然后再传给downstream的onNext方法处理。
比如ObserveOnObserver的onNext中,会将source的onNext方法抛到Worker中去处理,或者说抛到具体的某个线程中去处理。
关键源码解析
Observable.subscribe
在Observable的subscribe方法中,最终会调用到subscribeActual方法,subscribeActual方法是一个虚方法,会在子类中实现
public final void subscribe(@NonNull Observer<? super T> observer)
Objects.requireNonNull(observer, "observer is null");
try
observer = RxJavaPlugins.onSubscribe(this, observer);
Objects.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
subscribeActual(observer);
catch (NullPointerException e) // NOPMD
throw e;
catch (Throwable e)
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
protected abstract void subscribeActual(@NonNull Observer<? super T> observer);
Observable.subscribeOn
subscribeOn最终创建的是ObservableSubscribeOn对象。
public final Observable<T> subscribeOn(@NonNull Scheduler scheduler)
Objects.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<>(this, scheduler));
ObservableSubscribeOn.subscribeActual
subscribeOn这个方法是来决定订阅时执行的线程,因此在此处就需要抛到对应的线程去运行。
在subscribeActual中,最终会调用到Scheduler.scheduleDirect方法,在其中会将Observer中的逻辑放到新建的Worker中运行,即放到Scheduler设置的线程中运行。
public void subscribeActual(final Observer<? super T> observer)
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer);
observer.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
public Disposable scheduleDirect(@NonNull Runnable run)
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit)
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
ObservableSubscribeOn。SubscribeOnObserver.onNext
SubscribeOnObserver的onnext就只是调用了下上一个Observer的onNext方法。
此处有的读者可以会有点混乱,额外解释下。
Observable.subscribeOn:定义订阅时执行的线程
Observable.observeOn: 定义监听时执行的线程
public void onNext(T t)
downstream.onNext(t);
Observable.observeOn
observeOn方法最终返回的是ObservableObserveOn对象。
public final Observable<T> observeOn(@NonNull Scheduler scheduler)
return observeOn(scheduler, false, bufferSize());
public final Observable<T> observeOn(@NonNull Scheduler scheduler, boolean delayError, int bufferSize)
Objects.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<>(this, scheduler, delayError, bufferSize));
ObservableObserveOn.subscribeActual
此处会创建Worker,即定义Observer要运行的线程,然后创建Observer传给source.subscribe方法。
protected void subscribeActual(Observer<? super T> observer)
if (scheduler instanceof TrampolineScheduler)
source.subscribe(observer);
else
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<>(observer, w, delayError, bufferSize));
ObservableObserveOn.ObserveOnObserver.onNext
observeOn这个方法是来决定监听时执行的线程,因此会在Observer的方法中,将逻辑抛到对应的线程中执行。
此处最终会调用到worker.schedule方法,将逻辑放到对应的线程中去执行。
public void onNext(T t)
if (done)
return;
if (sourceMode != QueueDisposable.ASYNC)
queue.offer(t);
schedule();
void schedule()
if (getAndIncrement() == 0)
worker.schedule(this);
以上是关于Rxjava 源码解析 - 线程切换源码的主要内容,如果未能解决你的问题,请参考以下文章