Rxjava 源码解析 - subscribe源码
Posted 许佳佳233
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Rxjava 源码解析 - subscribe源码相关的知识,希望对你有一定的参考价值。
Rxjava源码解析系列:
Rxjava 源码解析(一) - subscribe源码
Rxjava 源码解析(二) - 线程切换源码
Rxjava 源码解析(三) - Schedulers默认线程池
概述
rxjava的应用还是较为广泛的,在实际项目中经常与MVP一起使用,可以使代码的可读性更高。在较为复杂的一些场景下,也可以使代码更加的简洁。
本文将会是对rxjava源码的探索,适合已经对rxjava的使用有一定经验的读者。
对MVP有兴趣的读者可以看下笔者的前文:
MVC、MVP、MVVM小记
同步Demo
主要逻辑如下:
- Observable调用create()创建 ObservableCreate
- ObservableCreate调用map,返回ObservableMap
- ObservableMap调用subscribe,执行最终的逻辑。
Observable.create(new ObservableOnSubscribe<String>()
@Override
public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable
Log.i("RxJavaTest", "subscribe");
emitter.onNext("123");
).map(new Function<String, String>()
@Override
public String apply(String s) throws Throwable
return s + "456";
).subscribe(new Observer<String>()
@Override
public void onSubscribe(@NonNull Disposable d)
Log.i("RxJavaTest", "onSubscribe");
@Override
public void onNext(@NonNull String s)
Log.i("RxJavaTest", "onNext: " + s);
@Override
public void onError(@NonNull Throwable e)
Log.i("RxJavaTest", "onError");
@Override
public void onComplete()
Log.i("RxJavaTest", "onComplete");
);
运行结果
RxJavaTest: onSubscribe
RxJavaTest: subscribe
RxJavaTest: onNext: 123456
onNext执行任务栈
此处由于都是在当前线程运行的,所以可以通过任务栈看到大概逻辑,如果是在其他线程执行的,则用打印任务栈的方式无法看到全貌。
读者如果有兴趣,可以通过打断点在android studio中看到任务栈
从执行subscribe开始,到最终调用到onNext的任务栈如下:
- ObservableMap.subscribe
- ObservableMap.subscribeActual
- ObservableCreate.subscribe
- ObservableCreate.subscribeActual
- MapObserver.onNext
- Observer.onNext
源码解析
Observable.subscribe
主要逻辑如下:
- 会调用RxJavaPlugins.onSubscribe 来执行hook的逻辑,这个逻辑不设置的话就没有。
- 然后就是会调用到subscribeActual方法,这个方法在Observable中是abstract方法,在它的子类中才会实现,比如ObservableMap和ObservableCreate都是它的子类。
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(@NonNull Observer<? super T> observer)
Objects.requireNonNull(observer, "observer is null");
try
observer = RxJavaPlugins.onSubscribe(this, observer);
Objects.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
subscribeActual(observer);
catch (NullPointerException e) // NOPMD
throw e;
catch (Throwable e)
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
ObservableMap.subscribeActual
主要逻辑:
- ObservableMap.subscribeActual最终调用到的是source.subscribe
- source是在构造函数的时候传进来的
- ObservableMap的构造函数,是在ObservableCreate的map方法中调用的
- 所以此处的source就是ObservableCreate,最终调用到的是ObservableCreate.subscribe
@Override
public void subscribeActual(Observer<? super U> t)
source.subscribe(new MapObserver<T, U>(t, function));
abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T>
/** The source consumable Observable. */
protected final ObservableSource<T> source;
/**
* Constructs the ObservableSource with the given consumable.
* @param source the consumable Observable
*/
AbstractObservableWithUpstream(ObservableSource<T> source)
this.source = source;
@Override
public final ObservableSource<T> source()
return source;
public final <@NonNull R> Observable<R> map(@NonNull Function<? super T, ? extends R> mapper)
Objects.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<>(this, mapper));
ObservableCreate.subscribeActual
ObservableCreate.subscribe调用的其实就是Observable.subscribe,这个前面讲到,最终会调用到ObservableCreate.subscribeActual。
主要逻辑:
- 首先会调用到Observer的onSubscribe逻辑
- 然后会在try-catch中执行subscribe
- 如果try-catch中出现异常,会调用 onError的逻辑
- 此处subscribeActual传进来的参数实际上是MapObserver,因此Demo中调用onNext方法的对象也是MapObserver
@Override
protected void subscribeActual(Observer<? super T> observer)
CreateEmitter<T> parent = new CreateEmitter<>(observer);
observer.onSubscribe(parent);
try
source.subscribe(parent);
catch (Throwable ex)
Exceptions.throwIfFatal(ex);
parent.onError(ex);
Observable.create(new ObservableOnSubscribe<String>()
@Override
public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable
Log.i("RxJavaTest", "subscribe");
emitter.onNext("123");
)
ObservableMap.MapObserver.onNext
主要逻辑:
- 先通过mapper.apply将onNext参数处理
- 处理完成的数传给downstream的onNext处理
- 此处downstream其实就是Demo中subsrcibe的参数,具体的可以通过MapObserver和其父类BasicFuseableObserver的构造函数看出。
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U>
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper)
super(actual);
this.mapper = mapper;
@Override
public void onNext(T t)
if (done)
return;
if (sourceMode != NONE)
downstream.onNext(null);
return;
U v;
try
v = Objects.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
catch (Throwable ex)
fail(ex);
return;
downstream.onNext(v);
————————————————省略
public BasicFuseableObserver(Observer<? super R> downstream)
this.downstream = downstream;
以上是关于Rxjava 源码解析 - subscribe源码的主要内容,如果未能解决你的问题,请参考以下文章