Android:rxjava简单实现原理(create操作符)

Posted JMW1407

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Android:rxjava简单实现原理(create操作符)相关的知识,希望对你有一定的参考价值。

rxjava

1、观察者模式

观察者模式基于Subject这个概念,Subject是一种特殊对象,又叫做主题或者被观察者。当它改变时那些由它保存的一系列对象将会得到通知,而这一系列对象被称作Observer(观察者)。它们会对外暴漏了一个通知方法(比方说update之类的),当Subject状态发生变化时会调用的这个方法。

观察者模式很适合下面这些场景中的任何一个:

  • 1、当你的架构有两个实体类,一个依赖另一个,你想让它们互不影响或者是独立复用它们时。
  • 2、当一个变化的对象通知那些与它自身变化相关联的未知数量的对象时。
  • 3、当一个变化的对象通知那些无需推断具体类型的对象时。

通常一个观察者模式的类图是这样的:


注意:

  • 1、继承关系用带空心三角箭头的实线来表示,箭头从子类指向父类
  • 2、在 UML 类图中,双向的关联可以用带两个箭头或者没有箭头的实线来表示,单向的关联用带一个箭头的实线来表示,箭头从使用类指向被关联的类。
  • 3、实现关系使用带空心三角箭头的虚线来表示。实现(Realization)关系是接口与实现类之间的关系。在这种关系中,类实现了接口,类中的操作实现了接口中所声明的所有的抽象操作。

2、Rxjava的观察者模式?


3、Rxjava简单实现

Observable类

/**
 * 被观察者的核心抽象类
 * 也是使用框架的入口
 * @param <T>
 */
public abstract class Observable<T> implements ObservableSource<T> {
    @Override
    public void subscribe(Observer observer) {
        // 和谁建立订阅?
        // 怎么建立订阅?
        // 为了保证拓展性,交给具体的开发人员实现。这里提供一个抽象的方法
        subscribeActual(observer);
    }
    protected abstract void subscribeActual(Observer<T> observer);

    public  static <T> Observable<T> create(ObservableOnSubscribe<T> source){
        return new ObservableCreate<>(source);
    }
}

Observer接口

public interface Observer<T> {
    void onSubscribe();//建立订阅关系时候的回掉方法,什么时候和被观察者建立订阅关系,就会调用该方法
    void onNext(T t);
    void onComplete();
    void onError(Throwable throwable);

}

ObservableCreate类

/**
 * 1、创建一个被观察者
 * 2、被观察者发射事件由具体的either发射器来完成
 * @param <T>
 */
public class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;//创建一个被观察者
    }

    @Override
    protected void subscribeActual(Observer<T> observer) {
        observer.onSubscribe();//建立订阅的时候调用
        CreateEmitter<T> emitter = new CreateEmitter<T>(observer);
        source.subscribe(emitter);
    }

    static class CreateEmitter<T> implements Emitter<T>{
        Observer<T>observer;//这里持有一个观察者,当事件发生时,直接调用该观察者对事件进行调用即可。
        boolean done;//互斥实现

        public  CreateEmitter(Observer<T> observer){
            this.observer = observer;
        }

        public void onNext(T t){
            if(done) return;
            observer.onNext(t);
        }

        public void onError(Throwable throwable){
            if(done) return;
            observer.onError(throwable);
            done = true;
        }

        public void onComplete(){
            if(done) return;
            observer.onComplete();
            done = true;
        }
    }

}

Emitter接口

/**
 * 事件发射器
 * @param <T>
 */
public interface Emitter<T> {
    void onNext(T t);
    void onComplete();
    void onError(Throwable throwable);
}

ObservableOnSubscribe接口

/**
 * 被观察者和事件发射器建立关系
 * 被观察者和事件之间解偶
 * @param <T>
 */
public interface ObservableOnSubscribe<T> {
    void subscribe(Emitter<T> emitter);
}

ObservableSource接口

/**
 * 被观察者的顶层接口
 * @param <T>
 */

public interface ObservableSource<T> {
    void subscribe(Observer<T>observer);
}

RxjavaTest

public class RxjavaTest {
    public static void main(String[] args){
        Observable.create(new ObservableOnSubscribe<Object>() {
            @Override
            public void subscribe(Emitter<Object> emitter){
                System.out.println("subscribe:.....");
                emitter.onNext("aaaa");
                emitter.onNext("CCCC");
                emitter.onNext("ddddd");
                emitter.onError(new Throwable());
                emitter.onComplete();
            }

        }).subscribe(new Observer() {
            @Override
            public void onSubscribe() {
                System.out.println("onSubscribe...");
            }

            @Override
            public void onNext(Object o) {
                System.out.println("onNext:.... " + o);
            }

            @Override
            public void onComplete() {
                System.out.println("onComplete:... " );
            }

            @Override
            public void onError(Throwable throwable) {
                System.out.println("onError :... " );
            }
        });
    }

}

试验结果

onSubscribe...
subscribe:.....
onNext:.... aaaa
onNext:.... CCCC
onNext:.... ddddd
onError :... 

以上是关于Android:rxjava简单实现原理(create操作符)的主要内容,如果未能解决你的问题,请参考以下文章

Android RxJava入门教程 & 简单原理分析

RxJava的学习与实现

Android开发之《RXJava的简单实现》

Carson带你学Android:手把手带你入门神秘的Rxjava

一起来造一个RxJava,揭秘RxJava的实现原理

一起来造一个RxJava,揭秘RxJava的实现原理