Rxjava 源码解析 - subscribe源码
Posted 许佳佳233
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Rxjava 源码解析 - subscribe源码相关的知识,希望对你有一定的参考价值。
概述
rxjava的应用还是较为广泛的,在实际项目中经常与MVP一起使用,可以使代码的可读性更高。在较为复杂的一些场景下,也可以使代码更加的简洁。
本文将会是对rxjava源码的探索,适合已经对rxjava的使用有一定经验的读者。
对MVP有兴趣的读者可以看下笔者的前文:
MVC、MVP、MVVM小记
同步Demo
主要逻辑如下:
- Observable调用create()创建 ObservableCreate
- ObservableCreate调用map,返回ObservableMap
- ObservableMap调用subscribe,执行最终的逻辑。
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {
Log.i("RxJavaTest", "subscribe");
emitter.onNext("123");
}
}).map(new Function<String, String>() {
@Override
public String apply(String s) throws Throwable {
return s + "456";
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.i("RxJavaTest", "onSubscribe");
}
@Override
public void onNext(@NonNull String s) {
Log.i("RxJavaTest", "onNext: " + s);
}
@Override
public void onError(@NonNull Throwable e) {
Log.i("RxJavaTest", "onError");
}
@Override
public void onComplete() {
Log.i("RxJavaTest", "onComplete");
}
});
运行结果
RxJavaTest: onSubscribe
RxJavaTest: subscribe
RxJavaTest: onNext: 123456
onNext执行任务栈
此处由于都是在当前线程运行的,所以可以通过任务栈看到大概逻辑,如果是在其他线程执行的,则用打印任务栈的方式无法看到全貌。
读者如果有兴趣,可以通过打断点在android studio中看到任务栈
从执行subscribe开始,到最终调用到onNext的任务栈如下:
- ObservableMap.subscribe
- ObservableMap.subscribeActual
- ObservableCreate.subscribe
- ObservableCreate.subscribeActual
- MapObserver.onNext
- Observer.onNext
源码解析
Observable.subscribe
主要逻辑如下:
- 会调用RxJavaPlugins.onSubscribe 来执行hook的逻辑,这个逻辑不设置的话就没有。
- 然后就是会调用到subscribeActual方法,这个方法在Observable中是abstract方法,在它的子类中才会实现,比如ObservableMap和ObservableCreate都是它的子类。
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(@NonNull Observer<? super T> observer) {
Objects.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
Objects.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
ObservableMap.subscribeActual
主要逻辑:
- ObservableMap.subscribeActual最终调用到的是source.subscribe
- source是在构造函数的时候传进来的
- ObservableMap的构造函数,是在ObservableCreate的map方法中调用的
- 所以此处的source就是ObservableCreate,最终调用到的是ObservableCreate.subscribe
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {
/** The source consumable Observable. */
protected final ObservableSource<T> source;
/**
* Constructs the ObservableSource with the given consumable.
* @param source the consumable Observable
*/
AbstractObservableWithUpstream(ObservableSource<T> source) {
this.source = source;
}
@Override
public final ObservableSource<T> source() {
return source;
}
}
public final <@NonNull R> Observable<R> map(@NonNull Function<? super T, ? extends R> mapper) {
Objects.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<>(this, mapper));
}
ObservableCreate.subscribeActual
ObservableCreate.subscribe调用的其实就是Observable.subscribe,这个前面讲到,最终会调用到ObservableCreate.subscribeActual。
主要逻辑:
- 首先会调用到Observer的onSubscribe逻辑
- 然后会在try-catch中执行subscribe
- 如果try-catch中出现异常,会调用 onError的逻辑
- 此处subscribeActual传进来的参数实际上是MapObserver,因此Demo中调用onNext方法的对象也是MapObserver
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {
Log.i("RxJavaTest", "subscribe");
emitter.onNext("123");
}
})
ObservableMap.MapObserver.onNext
主要逻辑:
- 先通过mapper.apply将onNext参数处理
- 处理完成的数传给downstream的onNext处理
- 此处downstream其实就是Demo中subsrcibe的参数,具体的可以通过MapObserver和其父类BasicFuseableObserver的构造函数看出。
static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
final Function<? super T, ? extends U> mapper;
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);
this.mapper = mapper;
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
downstream.onNext(null);
return;
}
U v;
try {
v = Objects.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
downstream.onNext(v);
}
————————————————省略
}
public BasicFuseableObserver(Observer<? super R> downstream) {
this.downstream = downstream;
}
以上是关于Rxjava 源码解析 - subscribe源码的主要内容,如果未能解决你的问题,请参考以下文章