Rxjava 基本原理解析

Posted 东山爱叨叨

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Rxjava 基本原理解析相关的知识,希望对你有一定的参考价值。

       ReactiveX是一种编程模型,全称是ReactiveExtensions,即可扩展的响应式编程,微软在201211月开源,官网 reactivex.ioRxjavaRxjava语言版本。本系列文章将以rxjava2版本进行分析。

   本篇主要介绍观察者模式和rxjava是如何实现观察者模式的。响应式编程主要采用观察者模式进行开发,即观察者(observer)订阅可观察的(Observable,被观察者)。可观察的(Observable)通过调用观察者的方法来发射事件给它的所有观察者(observer)。Java代码表示如下:

 

 //观察者

public interface Observer {

   void update(Object... objs);

}

 

//被观察者

public class Observable {

 

   protected List<Observer> obserList = new ArrayList<>();

 

   /**

    * AttachObserver(通过实例注册观察者)

    */

   public void registerObserver(Observer observer) {

       //...

       obserList.add(observer);

       //...

    }

 

   /**

    * Unattach Observer(注销观察者)

    */

   public void unRegisterObserver(Observer observer) {

       //...

       obserList.remove(observer);

       //...

    }

 

   /**

    * notify all observer(通知所有观察者,在子类中实现)

    */

   public  voidnotifyObservers(Object... objs){

    for (Observer observer : obserList) {

           this.notifyObserver(observer, objs);

       }

    }

 

   /**

    * notify one certain observer(通知某一个确定的观察者)

    *

    * @paramcls

    * @paramobjs

    */

   public   voidnotifyObserver(Observer observer, Object... objs){

   //...

     observer.update(objs);

    //...

    }

}

 

/**

 *Create by Dawson on 2018/8/12.

 */

public class MainTest {

   public static void main(String[] args) {

Observable observable = new Observable();//创建被观察者

Observer observer=new Observer() {//创建观察者

             @Override

             public void update(Object... objs) {

                System.out.print(objs[0]);

             }

};

observable.registerObserver(observer);//订阅注册观察者

observable.notifyObservers("hello dawson");//发送事件给观察者

    }

}

 

   代码一目了然,再来看看rxjava是如何实现观察者模式的。先看看使用的代码,再通过代码查看和分析源代码。如下是rxjava2.0的最基本的最简单使用方式:

Observable<String> observable = newObservable<String>() {//创建被观察者

           @Override

           protected void subscribeActual(Observer<? super String> observer){

               observer.onNext("hello");

           }

       };

Observer<String> observer = newObserver<String>() {//创建观察者

   @Override

   public void onSubscribe(Disposable d) {

 

    }

 

   @Override

   public void onNext(String value) {

 

    }

 

   @Override

   public void onError(Throwable e) {

 

    }

 

   @Override

   public void onComplete() {

 

    }

};

observable.subscribe(observer);//订阅观察者

   首先创建观察者observable,并重写subscribeActual 用于发送事件。实际使用中会使用create操作符创建一个ObservableCreate 的被观察着,后面文章会讲述。然后创建一个观察者observer,并重写onnext等方法。最后调用被观察着的subscribe方法订阅观察者。而这个事件流程的也是从subscribe订阅开始的。subscribe方法源代码如下:

public final void subscribe(Observer<?super T> observer) {

     //...

           subscribeActual(observer);

      //...

}

  会调用抽象方法subscribeActual,并将观察者传入。而我们重写了方法subscribeActual,其参数就是传入的观察者observer,在重新的方法subscribeActual直接调用观察者observeronnext方法,实现事件通知。


以上是关于Rxjava 基本原理解析的主要内容,如果未能解决你的问题,请参考以下文章

浅析RxJava 1.x&2.x版本使用区别及原理:ObservableFlowable等基本元素源码解析

浅析RxJava 1.x&2.x版本区别及原理:maplift操作符源码解析

浅析RxJava 1.x&2.x版本区别及原理:maplift操作符源码解析

浅析RxJava 1.x&2.x版本区别及原理:1.x 版本 Scheduler调度线程变换及subscribeOnobserveOn方法源码解析

浅析RxJava 1.x&2.x版本区别及原理:1.x 版本 Scheduler调度线程变换及subscribeOnobserveOn方法源码解析

浅析RxJava 1.x&2.x版本区别及原理:1.x 版本 Scheduler调度线程变换及subscribeOnobserveOn方法源码解析