Rxjava 流程分析

Posted xzj_2013

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Rxjava 流程分析相关的知识,希望对你有一定的参考价值。

Observable的创建流程分析

首先看一张Rxjava的时序图:

step1.Observable.create
在我们的使用案例中,第一步创建一个Observable对象,我们是通过Observable的Create方法来创建一个Observable实例, 我们就从这个方法开始深入源码分析Observeble的创建过程;
实质上 这就包括了上面时序图中的两步,先创建一个ObservableOnSubscribe(这是一个接口)实例,它是具有subscribe方法的函数接口,该方法接收ObservableEmitter实例,该实例允许以取消安全的方式推送事件
所以会通过new ObservableOnSubscribe()创建一个匿名内部类,里面就一个方法,也是我们实现的那个方法:

         new ObservableOnSubscribe<String>() 
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception 
                if (!emitter.isDisposed())
                    emitter.onNext("发射一条消息:");
                    emitter.onNext("你好!Rxjava");
                
                emitter.onComplete();

            
        

ObservableEmitter也是一个接口。里面方法很多,它也继承了 Emitter 接口。


public interface ObservableEmitter<T> extends Emitter<T> 
    //设置事件流的控制器,用于控制事件流取消
	void setDisposable(@Nullable Disposable d);
	//类似Disposable的功能  取消事件
	void setCancellable(@Nullable Cancellable c);
	//判断是否中断了
	boolean isDisposed();
	//序列化
	ObservableEmitter<T> serialize();
	//尝试去发送一个错误的事件流
	boolean tryOnError(@NonNull Throwable t);


public interface Emitter<T> 
	void onNext(T value);
	void onError(Throwable error);
	void onComplete();

Emitter定义了 我们在Observer里最常用的三个方法

其次才将该匿名内部类实例作为参数传递调用create方法

public abstract class Observable<T> implements ObservableSource<T> 
	......
 @CheckReturnValue
    @NonNull
    @SchedulerSupport(SchedulerSupport.NONE)
    //通过Observable的create方法创建一个冷的Observable 需要传一个ObservableOnSubscribe的参数
    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) 
    	//非空断言 source不能为null,为null抛出异常
        ObjectHelper.requireNonNull(source, "source is null");
        //Observable是abstract的抽象类  ObservableCreate是其一个实现类
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    
	......

step2.new ObservableCreate(source)
创建一个Observable的实现类,这个类才是真正处理订阅 以及分发事件的类;
具体是怎么实现订阅等后续分析

step3.RxJavaPlugins.onAssembly(ObservableCreate)

 /**
     * Calls the associated hook function.
     * 调用关联的钩子函数
     * @param <T> the value type
     * @param source the hook's input value
     * @return the value returned by the hook
     */
    @SuppressWarnings( "rawtypes", "unchecked" )
    @NonNull
    public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) 
        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
        if (f != null) 
            return apply(f, source);
        
        return source;
    

显然通过注释我们知道,这是一个钩子函数,也就是说如果 onObservableAssembly 的值不为空,那么就调用这个钩子函数,onObservableAssembly 是一个静态变量,需要我们主动的去设置才会赋值,这里当做空来考虑,如果 onObservableAssembly 为空的话,也就是说这个方法啥都没做,直接返回 source 参数,也就是上面的 ObservableCreate 对象,在大多数情况下,这个函数返回的还是函数传入的参数source;

step4 返回Observeble
在经过前面三步,我们可以发现Observeble的create方法,在没有异常及Hook处理的情况下,返回的就是在step2中创建的ObservableCreate对象,也正是在这个类中我们可以观察到订阅以及对其他事件的处理

Rxjava的订阅流程以及事件处理分析

订阅流程时序图:

然后根据时序图我们从源码分析订阅流程:
step1:Observable.subscribe(Observer)
在对Rxjava的使用中就是通过调用该方法实现观察者订阅被观察者,那么具体是如何实现?
看该方法的源码实现:

public abstract class Observable<T> implements ObservableSource<T> 
	......
	@SchedulerSupport(SchedulerSupport.NONE)
	@Override
	public final void subscribe(Observer<? super T> observer) 
	     //非空断言 observer不能为NULL
	     ObjectHelper.requireNonNull(observer, "observer is null");
	     try 
	     	// 钩子函数,方便Hook实现
            observer = RxJavaPlugins.onSubscribe(this, observer);
            //因为如果存在Hook的话 可能会对observer修改,所以修改再进行一次非空断言
            ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
            //订阅
            subscribeActual(observer);
         catch (NullPointerException e)  // NOPMD
            throw e;
         catch (Throwable e) 
            //抛出异常
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);
            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        
	
	......

step2:subscribeActual
直接上源码:

public abstract class Observable<T> implements ObservableSource<T> 
	......
	 /**
     * Operator implementations (both source and intermediate) should implement this method that
     * performs the necessary business logic and handles the incoming @link Observers.
     * <p>There is no need to call any of the plugin hooks on the current @code Observable instance or
     * the @code Observer; all hooks and basic safeguards have been
     * applied by @link #subscribe(Observer) before this method gets called.
     * @param observer the incoming Observer, never null
     */
    protected abstract void subscribeActual(Observer<? super T> observer);
	......

这是一个抽象方法,那么必然需要寻找对应实现该方法的子类;
看上面Observable的创建流程,返回的Observeble是一个ObservableCreate对象,因此我们去这个类中找真正的实现:

public final class ObservableCreate<T> extends Observable<T> 
    //这个就是创建流程中传递的ObservableOnSubscribe的一个匿名实现类
    final ObservableOnSubscribe<T> source;
	......
	@Override
    protected void subscribeActual(Observer<? super T> observer) 
        //创建了一个发射器CreateEmitter,这个CreateEmitter很眼熟 就是创建流程中ObservableEmitter的实现类,也就是负责具体的事件分发以及控制处理的类
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        //通知观察者 已经产生了订阅
        observer.onSubscribe(parent);
        try 
            //回调通知被观察者订阅关系已经建立,并提供一个发射器工具发送数据
            source.subscribe(parent);
         catch (Throwable ex) 
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        
    
	......

step3:CreateEmitter实现

static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable 
        private static final long serialVersionUID = -3434801548987643227L;
        //观察者对象
        final Observer<? super T> observer;
        CreateEmitter(Observer<? super T> observer) 
            this.observer = observer;
        
        @Override
        public void onNext(T t) 
            //接收到被观察发送的oNext事件
            if (t == null) 
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            
            //如果没有Disposable 也就是控制流没有被中断
            if (!isDisposed()) 
                //通知观察 收到onNext事件
                observer.onNext(t);
            
        

        @Override
        public void onError(Throwable t) 
            if (!tryOnError(t)) 
                RxJavaPlugins.onError(t);
            
        
        @Override
        public boolean tryOnError(Throwable t) 
            if (t == null) 
                t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
            
            if (!isDisposed()) 
                try 
                    observer.onError(t);
                 finally 
                    dispose();
                
                return true;
            
            return false;
        
        @Override
        public void onComplete() 
            if (!isDisposed()) 
                try 
                    observer.onComplete();
                 finally 
                    dispose();
                
            
        
       @Override
        public void setDisposable(Disposable d) 
            DisposableHelper.set(this, d);
        
        @Override
        public void setCancellable(Cancellable c) 
            setDisposable(new CancellableDisposable(c));
        
        @Override
        public ObservableEmitter<T> serialize() 
            return new SerializedEmitter<T>(this);
        
        @Override
        public void dispose() 
            DisposableHelper.dispose(this);
        

        @Override
        public boolean isDisposed() 
            return DisposableHelper.isDisposed(get());
        

        @Override
        public String toString() 
            return String.format("%s%s", getClass().getSimpleName(), super.toString());
        
    

这也就回答了上面为什么说ObserverCreate是真正实现事件分发处理的类,因为提供一个Emitter去完成这些工作;
从源码中我们可以看到主要就是两部分:
一部分是对Emitter功能的处理:
也就是Observer中最常用的三个方法,看实现也只是做了一层包装 做了一些对控制逻辑的判断 最终调用的还是观察者的onNext onComplete以及onError;
另外一部分就是对事件的逻辑控制:提供中断方法 提供设置Disposable 和Cancellable 实现对事件的控制,提供对事件中断的判断等。

step4:source.subscribe
订阅关系已经建立,通过创建被观察者时传入的回调接口通知被观察者,同时提供了一个发射器用于发射数据

总结:Rxjava的调用链

以上是关于Rxjava 流程分析的主要内容,如果未能解决你的问题,请参考以下文章

RxJava2.0中fromArray操作符用法和源码分析

RxJava之错误处理

Rxjava上手

RxJava concatMap操作符

Rxjava Subject分析

RxJava2.0中map操作符用法和源码分析