从源码的角度去探索RxJava笔记

Posted 巨头之路

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了从源码的角度去探索RxJava笔记相关的知识,希望对你有一定的参考价值。

RxJava是一个异步操作库,在线程切换上有很大的优势,以链式的Api方式来进行线程的切换,以及提供一系列操作符,使得代码变得简洁,维护起来也相对简单,可以避免回调地狱或迷之缩进.

本篇记录RxJava订阅操作的源码分析

基本API

先看下RxJava最基本的Api使用


	Observable.create(new ObservableOnSubscribe<String>()   //创建被观察者
	    @Override
	    public void subscribe(ObservableEmitter<String>emitter) throws Exception 
	        emitter.onNext("1");
	        emitter.onNext("2");
	        emitter.onNext("3");
	        emitter.onComplete();
	    
	)
	.subscribeOn(Schedulers.io())  		  //指定subscribe 发生在io线程	
	.observeOn(androidSchedulers.mainThread())    //指定回调发生在主线程
	.subscribe(new Observer<String>()      //订阅操作
	    @Override
	    public void onSubscribe(Disposable d) 
	        Log.d(TAG, "onSubscribe");
	    
	    @Override
	    public void onNext(String s) 
	        Log.d(TAG, "onNext : " + s);
	    
	    @Override
	    public void onError(Throwable e) 
	        Log.d(TAG, "onError : " + e.toString());
	    
	    @Override
	    public void onComplete() 
	        Log.d(TAG, "onComplete");
	    
	);

RxJava是基于观察者模式的,在上面的Api中,主角是Observable和Observer,Observable是被观察者,Observer是观察者,那不就是被观察者订阅[subscribe] 了观察者,是不是觉得有问题,不是应该观察者订阅被观察者嘛?

其实从观察者模式来说,确实应该是观察者订阅被观察者,但这里是从Api的角度来看,RxJava之所以这么做,主要是链式API的设计.

在上面使用的Api中,RxJava主要做了切换线程和订阅操作,在本篇笔记中,先不考虑线程切换,从源码的角度来看RxJava是如何订阅的?被观察者又是如何传递消息给观察者,看下面订阅的Api


	
	Observable.create(new ObservableOnSubscribe<String>()   //创建被观察者
	    @Override
	    public void subscribe(ObservableEmitter<String>emitter) throws Exception 
	        emitter.onNext("1");  //调用到观察者的onNext函数
	        emitter.onNext("2");
	        emitter.onNext("3");
	        emitter.onComplete(); //调用到观察者的onComplete函数
	    
	)
	.subscribe(new Observer<String>()      //订阅操作
	    @Override
	    public void onSubscribe(Disposable d)  
	        Log.d(TAG, "onSubscribe");
	    
	    @Override
	    public void onNext(String s) 
	        Log.d(TAG, "onNext : " + s);
	    
	    @Override
	    public void onError(Throwable e) 
	        Log.d(TAG, "onError : " + e.toString());
	    
	    @Override
	    public void onComplete() 
	        Log.d(TAG, "onComplete");
	    
	);


源码解析

本文RxJava源码基于版本3.x

1.创建被观察者: Observable.create()


	public abstract class Observable<@NonNull T> implements ObservableSource<T> 

		//create函数的返回值是ObservableCreate实例
		public static <T> Observable<T> create(@NonNull ObservableOnSubscribe<T> source)   

	        Objects.requireNonNull(source, "source is null");

	        //RxJavaPlugins.onAssembly是一个hook的抽象代理, 返回值是ObservableCreate实例
	        return RxJavaPlugins.onAssembly(new ObservableCreate<>(source));
	    

	


	public final class RxJavaPlugins 

		//source = ObservableCreate
		public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) 
	        Function<? super Observable, ? extends Observable> f = onObservableAssembly;
	        if (f != null) 
	            return apply(f, source);
	        
	        return source;
	    
	

从Observable开始,看上面代码中的注释,可以看到create函数的返回值是ObservableCreate这个实例,ObservableCreate类的构造函数的参数就是 此处create函数的参数ObservableOnSubscribe,即在ObservableCreate类的成员变量 source = ObservableOnSubscribe实例

2.订阅操作: subscribe()


	public abstract class Observable<@NonNull T> implements ObservableSource<T> 

		public final Disposable subscribe(@NonNull Consumer<? super T> onNext) 
			//重载函数,调用下面的subscribe函数
	        return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION);
	    


		public final Disposable subscribe(@NonNull Consumer<? super T> onNext, @NonNull Consumer<? super Throwable> onError,@NonNull Action onComplete) 

	        Objects.requireNonNull(onNext, "onNext is null");
	        Objects.requireNonNull(onError, "onError is null");
	        Objects.requireNonNull(onComplete, "onComplete is null");
	
	        LambdaObserver<T> ls = new LambdaObserver<>(onNext, onError, onComplete, Functions.emptyConsumer());
	
			//调用下面的subscribe函数
	        subscribe(ls);
	
	        return ls;
	    

		
		//按照当前Api链路分析,这个observer是开发者实现的观察者
		public final void subscribe(@NonNull Observer<? super T> observer) 

	        Objects.requireNonNull(observer, "observer is null");
	        try 

				//hook的抽象代理
	            observer = RxJavaPlugins.onSubscribe(this, observer);
	
				//...省略代码	            

	            //subscribeActual是个抽象函数,调用到ObservableCreate类的subscribeActual函数
	            subscribeActual(observer);
	        
				//...省略代码
	    


		protected abstract void subscribeActual(@NonNull Observer<? super T> observer);

	
	

根据上面1的分析,create函数返回的是ObservableCreate实例,而subscribeActual是个抽象函数,所以其实现方法在ObservableCreate实例里


	public final class ObservableCreate<T> extends Observable<T> 

		final ObservableOnSubscribe<T> source;
	
	    public ObservableCreate(ObservableOnSubscribe<T> source) 
	        //这里的source = 开发者调用observable.create函数时传入的ObservableOnSubscribe实例
	        this.source = source;
	    
	
	    @Override
	    protected void subscribeActual(Observer<? super T> observer) 
	        //这个CreateEmitter是观察者和被观察者通讯的核心,内部包装了observer这个观察者
	        //按照我们当前链路[订阅,不考虑线程切换]的分析,这个observer = 开发者在调用subscribe函数时传入的observer
	        CreateEmitter<T> parent = new CreateEmitter<>(observer);
	        //告诉观察者已经成功订阅了被观察者
	        observer.onSubscribe(parent);  //按照我们当前链路[订阅,不考虑线程切换]的分析,,这里的observer = SubscribeOnObserver
	
	        try 
	            //结合上面的source,这里就调用到开发者在调用Observable.create函数时
	            //传入的ObservableOnSubscribe实例的subscribe函数
	            source.subscribe(parent);
	         catch (Throwable ex) 
	            Exceptions.throwIfFatal(ex);
	            parent.onError(ex);
	        
	    
	
	

看上面的代码和注释,subscribeActual函数主要做了两个操作,结合下面的Api来看

  • 调用到下面的代码注释1,告诉观察者已经成功订阅了被观察者
  • 创建CreateEmitter[ 观察者和被观察者通讯的核心 ], 调用到下面的代码注释2, ObservableOnSubscribe实例的subscribe函数

	Observable.create(new ObservableOnSubscribe<String>()   //创建被观察者
	    @Override
		//2
	    public void subscribe(ObservableEmitter<String>emitter) throws Exception 
	        emitter.onNext("1");  //调用到观察者的onNext函数
	        emitter.onNext("2");
	        emitter.onNext("3");
	        emitter.onComplete(); //调用到观察者的onComplete函数
	    
	)
	.subscribe(new Observer<String>()      //订阅操作
	    @Override
	    public void onSubscribe(Disposable d)   //1.告诉观察者已经成功订阅了被观察者
	        Log.d(TAG, "onSubscribe");
	    
	    @Override
	    public void onNext(String s) 
	        Log.d(TAG, "onNext : " + s);
	    
	    @Override
	    public void onError(Throwable e) 
	        Log.d(TAG, "onError : " + e.toString());
	    
	    @Override
	    public void onComplete() 
	        Log.d(TAG, "onComplete");
	    
	);

订阅操作最后会走到ObservableOnSubscribe实例的subscribe函数,而CreateEmitter类实现了ObservableEmitter接口,所以emitter参数的实现实例是CreateEmitter, 在subscribe函数函数中调用了CreateEmitter的 onNext和onComplete 函数, 接着看CreateEmitter的 onNext和onComplete 函数


	public final class ObservableCreate<T> extends Observable<T> 

		static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable 

			final Observer<? super T> observer;

	        CreateEmitter(Observer<? super T> observer) 
	            this.observer = observer;   //observer = SubscribeOnObserver
	        
	
	        @Override
	        public void onNext(T t) 
	            if (t == null) 
	                onError(ExceptionHelper.createNullPointerException("onNext called with a null value."));
	                return;
	            
	            if (!isDisposed())  //isDisposed是订阅标识,未取消订阅则进入下面的逻辑
	                //调用到观察者的onNext函数
	                observer.onNext(t);
	            
	        
	
	        @Override
	        public void onError(Throwable t) 
	            //未取消订阅则返回true,取消订阅则返回false
	            if (!tryOnError(t)) 
	                //框架会自己捕捉异常,如果开发者有设置全局的UncaughtExceptionHandler,框架自身会传递过去,
	                // 如果开发者未设置,则抛出异常,会导致崩溃
	                RxJavaPlugins.onError(t);
	            
	        
	
	        //未取消订阅则返回true,取消订阅则返回false
	        @Override
	        public boolean tryOnError(Throwable t) 
	            if (t == null) 
	                t = ExceptionHelper.createNullPointerException("onError called with a null Throwable.");
	            
	            if (!isDisposed()) //isDisposed是订阅标识,未取消订阅则进入下面的逻辑
	                try 
	                    observer.onError(t); //调用到观察者的onError函数
	                 finally 
	                    dispose(); //取消订阅
	                
	                return true;
	            
	            return false;
	        
	
	        @Override
	        public void onComplete() 
	            if (!isDisposed()) //isDisposed是订阅标识,未取消订阅则进入下面的逻辑
	                try 
	                    //调用到观察者的onComplete函数
	                    observer.onComplete();
	                 finally 
	                    dispose();//取消订阅
	                
	            
	        

			
			@Override
	        public boolean isDisposed() 
	            //原子引用类AtomicReference的get函数, 解决并发读写问题
	            return DisposableHelper.isDisposed(get());
	        
		
		
	




	public enum DisposableHelper implements Disposable 

		//取消订阅操作
		public static boolean dispose(AtomicReference<Disposable> field) 
	        Disposable current = field.get();
	        Disposable d = DISPOSED;
	        if (current != d) 
	            //使用了原子引用AtomicReference内部包装的CAS方法处理了标志Disposable的并发读写问题
	            current = field.getAndSet(d);
	            if (current != d) 
	                if (current != null) 
	                    current.dispose();
	                
	                return true;
	            
	        
	        return false;
	    

		

总结下上面代码和注释:

  • 在ObservableCreate的onNext函数中,会判断订阅标识是否已经取消,没有取消的话则调用observer.onNext, 这个onNext即开发者实现的observer的onNext函数

  • 在ObservableCreate的onComplete中,会判断订阅标识是否已经取消,没有取消的话则调用observer.onComplete,这个onComplete即开发者实现的observer的onComplete函数;最后则进行取消订阅操作,防止内存泄漏

  • 在其onError函数中首先会判断订阅标识是否已经取消,取消了的话则调用RxJavaPlugins进行抛出异常[这里如果有设置全局异常捕捉器的话,则会将异常传递给全局的捕捉器];未取消的话则调用observer.onError(t),这个onError即开发者实现的observer的onError函数;最后则进行取消订阅操作,防止内存泄漏

  • dispose函数即取消订阅操作,此处使用了原子引用AtomicReference内部包装的CAS方法处理了标志Disposable的并发读写问题

如此这般,则完成了订阅操作的源码流程分析.最后总结下RxJava是如何订阅的?被观察者又是如何传递消息给观察者

RxJava是如何实现观察者订阅被观察者的?

通过ObservableCreate类和其内部的CreateEmitter类关联上的

  • 1.在执行Observable.create函数时,将ObservableOnSubscribe包装到ObservableCreate类中,即ObservableCreate的成员变量source=ObservableOnSubscribe,最后返回值是ObservableCreate

  • 2.接着执行subscribe(observer)函数进行订阅(核心在ObservableCreate的subscribeActual函数),先创建以observer为参数的CreateEmitter实例,接着调用observer的onSubscribe函数,这里是告诉观察者已经成功订阅了被观察者;最后调用上面的ObservableOnSubscribe的subscribe函数,并传递CreateEmitter实例过去

到这里,Observer就已经关联上Observable,通过CreateEmitter这个核心类,接着看createEmitter是如何发送消息的?

被观察者又是如何传递消息给观察者

  • 3.接上面1、2步,这时程序就走到我们实例的ObservableOnSubscribe实例的subscribe函数,我们在上面的subscribe函数内调用emitter.onNext(“1”)、emitter.onComplete() 函数.

  • 4.在CreateEmitter的onNext、onComplete()函数中,会调用observer的onNext、onComplete函数,而这个observer就是在上面第2步中执行订阅操作subscribe(observer)函数的这个observer,因此调用的就是我们实现的观察者observer中的onNext、onComplete函数.

这下面两步,就是observer和observable通讯的核心

以上是关于从源码的角度去探索RxJava笔记的主要内容,如果未能解决你的问题,请参考以下文章

Rxjava源码分析&实践RxJava基本原理分析之订阅流

RxJava系列6(从微观角度解读RxJava源码)

从源码的角度理解四大组件的工作过程——Android开发艺术探索笔记

理解RxJava线程模型

Rxjava 源码解析 - subscribe源码

springboot源码解析-从源码角度分析系统初始化器的实现原理