RxJava编程思想2-(实现简易版Rxjava,如何实现操作符?)

Posted 0 and 1

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RxJava编程思想2-(实现简易版Rxjava,如何实现操作符?)相关的知识,希望对你有一定的参考价值。

前言

已经使用rxjava两个月了,觉得rxjava特别好用,爱不释手。本文目的是通过几百行的代码,帮助大家理解rxjava中的链式调用,操作符,线程切换是如何实现的。如果有写的不对的地方欢迎在评论区留言,如果觉得写的可以,请点赞,关注,谢谢。

代码链接: github

目录:

RxJava编程思想1-(实现简易版Rxjava,如何基本功能和链式调用?)
RxJava编程思想2-(实现简易版Rxjava,如何实现操作符?)
RxJava编程思想3-(实现简易版Rxjava,如何实现线程切换?)

操作符如何实现?我们先实现最简单的操作符— map操作符

我们先想一下操作符的设计初衷是什么?-----一句话概括:对被观察者的数据进行一系列的转换,处理,加工然后再分发给观察者。
数据是观察者发出的,如果我们想对数据进行处理,我们会如何设计?是不是很容易想到运用装饰者模式。
思路以及有了,现在就操刀吧。

插入一下装饰者模式的几个关键对象
(1)抽象组件(Component) :定义装饰方法的规范
(2)被装饰者(ConcreteComponent) :Component的具体实现,也就是我们要装饰的具体对象。
(3)装饰者组件(Decorator) :持有组件(Component)对象的实例引用,该类的职责就是为了装饰具体组件对象,定义的规范。
(4)具体装饰(ConcreteDecorator) :负责给构件对象装饰附加的功能

第一步:确定装饰者模式的抽象组件 ObservableSource
第二步:确定装饰者模式的装饰者组件:改造Observable类为抽象父类

public abstract class Observable<T> implements ObservableSource<T> 

    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) 
        return new ObservableCreate(source);
    

    @Override
    public void subscribe(Observer<? super T> observer) 
        subscribeActual(observer);
    

    //实际的处理过程交个子类实现
    protected abstract void subscribeActual(Observer<? super T> observer);

第三步:被装饰者ObservableCreate

public final class ObservableCreate<T> extends Observable<T> 
    final ObservableOnSubscribe<T> source;

    public ObservableCreate(ObservableOnSubscribe<T> source) 
        this.source = source;
    

    @Override
    protected void subscribeActual(Observer<? super T> observer) 
        CreateEmitter<T> createEmitter = new CreateEmitter<T>(observer);
        observer.onSubscribe();
        try 
            source.subscribe(createEmitter);
         catch (Exception e) 
            e.printStackTrace();
        
    
    //把ObservableEmitter 分发的事件分发给observer

    static final class CreateEmitter<T> implements ObservableEmitter<T> 
        final Observer<? super T> observer;

        public CreateEmitter(Observer<? super T> observer) 
            this.observer = observer;
        

        @Override
        public void onNext(T value) 
            observer.onNext(value);
        

        @Override
        public void onError(Throwable error) 
            onError(error);
        

        @Override
        public void onComplete() 
            observer.onComplete();
        
    


第四步,具体装饰者ObservableMap
Function<? super T, ? extends U> function;//把装饰的部分操作交给用户定义,通过接口形式传递进来
ObservableSource source; //被装饰的对象

public class ObservableMap<T, U> extends Observable<U> 
    final Function<? super T, ? extends U> function;
    final ObservableSource<T> source;

    public ObservableMap(Observable<T> source, Function<? super T, ? extends U> function) 
        this.source = source;
        this.function = function;
    

    @Override
    protected void subscribeActual(Observer<? super U> observer) 
        MapObserver mapObserver = new MapObserver<T, U>(observer, function);
        source.subscribe(mapObserver);

    

    static final class MapObserver<T, U> implements Observer<T> 
        protected final Observer<? super U> actual;
        final Function<? super T, ? extends U> mapper;

        public MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) 
            this.actual = actual;
            this.mapper = mapper;
        

        @Override
        public void onSubscribe() 
            actual.onSubscribe();
        

        @Override
        public void onNext(T value) 
            CheckUtils.checkNotNull(value, "onNext called parameter can not be null");
            U u = null;
            try 
                u = mapper.apply(value);//如何转换交给子类实现
             catch (Exception e) 
                e.printStackTrace();
            
            actual.onNext(u);
        

        @Override
        public void onError(Throwable e) 
            actual.onError(e);
        

        @Override
        public void onComplete() 
            actual.onComplete();
        
    


//涉及到的类:Function,数据转换的接口,约定

public interface Function<T, R> 
    // T 表示输入值,R 表示输出值,把T转换成R,
    R apply(T value) throws Exception;


为了方便用户使用和链式调用,在装饰者组件Observable中 加入工具方法

 public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) 
        return new ObservableMap<T, R>(this, mapper);
    

下面就可以使用了:

public class Test 
    public static void main(String[] args) 
        Observable.create(new ObservableOnSubscribe<String>() 
            @Override
            public void subscribe(ObservableEmitter<String> observableEmitter) 
                System.out.println("发射数据" + "1");
                observableEmitter.onNext("1");
                System.out.println("发射数据" + "2");
                observableEmitter.onNext("2");
            
        ).map(new Function<String, Integer>() 
            @Override
            public Integer apply(String value) throws Exception 
                return Integer.parseInt(value);
            
        ).subscribe(new Observer<Integer>() 
            @Override
            public void onSubscribe() 

            

            @Override
            public void onNext(Integer value) 
                System.out.println("接收数据" + value);
            

            @Override
            public void onError(Throwable e) 

            

            @Override
            public void onComplete() 

            
        );
    

运行打印-------------------------------
发射数据1
接收数据1
发射数据2
接收数据2

其他操作符的设计方式雷同,实现细节不一样而已,看懂了这里再去看看源码就明白了。

以上是关于RxJava编程思想2-(实现简易版Rxjava,如何实现操作符?)的主要内容,如果未能解决你的问题,请参考以下文章

RxJava编程思想3-(实现简易版Rxjava,如何实现线程切换)

RxJava编程思想3-(实现简易版Rxjava,如何实现线程切换)

RxJava编程思想1-(实现简易版Rxjava,如何基本功能和链式调用?)

精通高级RxJava 2响应式编程思想

百度云网盘-36精通高级RxJava 2响应式编程思想

(百度云课程分享资源大全)IT视频课程:精通高级RxJava 2响应式编程思想