Rxjava 流程分析
Posted xzj_2013
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Rxjava 流程分析相关的知识,希望对你有一定的参考价值。
Observable的创建流程分析
首先看一张Rxjava的时序图:
step1.Observable.create
在我们的使用案例中,第一步创建一个Observable对象,我们是通过Observable的Create方法来创建一个Observable实例, 我们就从这个方法开始深入源码分析Observeble的创建过程;
实质上 这就包括了上面时序图中的两步,先创建一个ObservableOnSubscribe(这是一个接口)实例,它是具有subscribe方法的函数接口,该方法接收ObservableEmitter实例,该实例允许以取消安全的方式推送事件
所以会通过new ObservableOnSubscribe()创建一个匿名内部类,里面就一个方法,也是我们实现的那个方法:
new ObservableOnSubscribe<String>()
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception
if (!emitter.isDisposed())
emitter.onNext("发射一条消息:");
emitter.onNext("你好!Rxjava");
emitter.onComplete();
ObservableEmitter也是一个接口。里面方法很多,它也继承了 Emitter 接口。
public interface ObservableEmitter<T> extends Emitter<T>
//设置事件流的控制器,用于控制事件流取消
void setDisposable(@Nullable Disposable d);
//类似Disposable的功能 取消事件
void setCancellable(@Nullable Cancellable c);
//判断是否中断了
boolean isDisposed();
//序列化
ObservableEmitter<T> serialize();
//尝试去发送一个错误的事件流
boolean tryOnError(@NonNull Throwable t);
public interface Emitter<T>
void onNext(T value);
void onError(Throwable error);
void onComplete();
Emitter定义了 我们在Observer里最常用的三个方法
其次才将该匿名内部类实例作为参数传递调用create方法
public abstract class Observable<T> implements ObservableSource<T>
......
@CheckReturnValue
@NonNull
@SchedulerSupport(SchedulerSupport.NONE)
//通过Observable的create方法创建一个冷的Observable 需要传一个ObservableOnSubscribe的参数
public static <T> Observable<T> create(ObservableOnSubscribe<T> source)
//非空断言 source不能为null,为null抛出异常
ObjectHelper.requireNonNull(source, "source is null");
//Observable是abstract的抽象类 ObservableCreate是其一个实现类
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
......
step2.new ObservableCreate(source)
创建一个Observable的实现类,这个类才是真正处理订阅 以及分发事件的类;
具体是怎么实现订阅等后续分析
step3.RxJavaPlugins.onAssembly(ObservableCreate)
/**
* Calls the associated hook function.
* 调用关联的钩子函数
* @param <T> the value type
* @param source the hook's input value
* @return the value returned by the hook
*/
@SuppressWarnings( "rawtypes", "unchecked" )
@NonNull
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source)
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null)
return apply(f, source);
return source;
显然通过注释我们知道,这是一个钩子函数,也就是说如果 onObservableAssembly 的值不为空,那么就调用这个钩子函数,onObservableAssembly 是一个静态变量,需要我们主动的去设置才会赋值,这里当做空来考虑,如果 onObservableAssembly 为空的话,也就是说这个方法啥都没做,直接返回 source 参数,也就是上面的 ObservableCreate 对象,在大多数情况下,这个函数返回的还是函数传入的参数source;
step4 返回Observeble
在经过前面三步,我们可以发现Observeble的create方法,在没有异常及Hook处理的情况下,返回的就是在step2中创建的ObservableCreate对象,也正是在这个类中我们可以观察到订阅以及对其他事件的处理
Rxjava的订阅流程以及事件处理分析
订阅流程时序图:
然后根据时序图我们从源码分析订阅流程:
step1:Observable.subscribe(Observer)
在对Rxjava的使用中就是通过调用该方法实现观察者订阅被观察者,那么具体是如何实现?
看该方法的源码实现:
public abstract class Observable<T> implements ObservableSource<T>
......
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer)
//非空断言 observer不能为NULL
ObjectHelper.requireNonNull(observer, "observer is null");
try
// 钩子函数,方便Hook实现
observer = RxJavaPlugins.onSubscribe(this, observer);
//因为如果存在Hook的话 可能会对observer修改,所以修改再进行一次非空断言
ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
//订阅
subscribeActual(observer);
catch (NullPointerException e) // NOPMD
throw e;
catch (Throwable e)
//抛出异常
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
......
step2:subscribeActual
直接上源码:
public abstract class Observable<T> implements ObservableSource<T>
......
/**
* Operator implementations (both source and intermediate) should implement this method that
* performs the necessary business logic and handles the incoming @link Observers.
* <p>There is no need to call any of the plugin hooks on the current @code Observable instance or
* the @code Observer; all hooks and basic safeguards have been
* applied by @link #subscribe(Observer) before this method gets called.
* @param observer the incoming Observer, never null
*/
protected abstract void subscribeActual(Observer<? super T> observer);
......
这是一个抽象方法,那么必然需要寻找对应实现该方法的子类;
看上面Observable的创建流程,返回的Observeble是一个ObservableCreate对象,因此我们去这个类中找真正的实现:
public final class ObservableCreate<T> extends Observable<T>
//这个就是创建流程中传递的ObservableOnSubscribe的一个匿名实现类
final ObservableOnSubscribe<T> source;
......
@Override
protected void subscribeActual(Observer<? super T> observer)
//创建了一个发射器CreateEmitter,这个CreateEmitter很眼熟 就是创建流程中ObservableEmitter的实现类,也就是负责具体的事件分发以及控制处理的类
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
//通知观察者 已经产生了订阅
observer.onSubscribe(parent);
try
//回调通知被观察者订阅关系已经建立,并提供一个发射器工具发送数据
source.subscribe(parent);
catch (Throwable ex)
Exceptions.throwIfFatal(ex);
parent.onError(ex);
......
step3:CreateEmitter实现
static final class CreateEmitter<T>
extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable
private static final long serialVersionUID = -3434801548987643227L;
//观察者对象
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer)
this.observer = observer;
@Override
public void onNext(T t)
//接收到被观察发送的oNext事件
if (t == null)
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
//如果没有Disposable 也就是控制流没有被中断
if (!isDisposed())
//通知观察 收到onNext事件
observer.onNext(t);
@Override
public void onError(Throwable t)
if (!tryOnError(t))
RxJavaPlugins.onError(t);
@Override
public boolean tryOnError(Throwable t)
if (t == null)
t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
if (!isDisposed())
try
observer.onError(t);
finally
dispose();
return true;
return false;
@Override
public void onComplete()
if (!isDisposed())
try
observer.onComplete();
finally
dispose();
@Override
public void setDisposable(Disposable d)
DisposableHelper.set(this, d);
@Override
public void setCancellable(Cancellable c)
setDisposable(new CancellableDisposable(c));
@Override
public ObservableEmitter<T> serialize()
return new SerializedEmitter<T>(this);
@Override
public void dispose()
DisposableHelper.dispose(this);
@Override
public boolean isDisposed()
return DisposableHelper.isDisposed(get());
@Override
public String toString()
return String.format("%s%s", getClass().getSimpleName(), super.toString());
这也就回答了上面为什么说ObserverCreate是真正实现事件分发处理的类,因为提供一个Emitter去完成这些工作;
从源码中我们可以看到主要就是两部分:
一部分是对Emitter功能的处理:
也就是Observer中最常用的三个方法,看实现也只是做了一层包装 做了一些对控制逻辑的判断 最终调用的还是观察者的onNext onComplete以及onError;
另外一部分就是对事件的逻辑控制:提供中断方法 提供设置Disposable 和Cancellable 实现对事件的控制,提供对事件中断的判断等。
step4:source.subscribe
订阅关系已经建立,通过创建被观察者时传入的回调接口通知被观察者,同时提供了一个发射器用于发射数据
总结:Rxjava的调用链
以上是关于Rxjava 流程分析的主要内容,如果未能解决你的问题,请参考以下文章