框架手写系列---RxJava之从零开始
Posted 战国剑
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了框架手写系列---RxJava之从零开始相关的知识,希望对你有一定的参考价值。
RxJava是我们日常在java与android中十分常用的一个库。它的作用是作为一个异步链式调用框架,将一个可能复杂的程序流程,转换为从顶向下的线性调用。避免了传统流程中,程序结构复杂以及可能的多重回调的样式。
这里将从零开始,手写一个RxJava,从代码和设计模式角度做步步变换,一步步的完成该框架。
一、传统观察者模式如何转换为链式调用
都知道RxJava是基于观察者模式的。但在使用RxJava中,可以看到这与传统的观察者模式是有较大差别的。
传统观察者模式:基本上是一个被观察者对应多个观察者。观察者都订阅同一个被观察者,当被观察者有变化时,通知已订阅的观察者。
传统观察者模式的简易代码,如下:
//观察者抽象接口
public interface Observer
public void update(String message);
//被观察者抽象接口
public interface Observable
/**
* 增加订阅者
* @param observer
*/
public void attach(Observer observer);
/**
* 删除订阅者
* @param observer
*/
public void detach(Observer observer);
/**
* 通知订阅者更新消息
*/
public void notify(String message);
/***************** 以下是实例 *****************/
//观察者实例
public class WeixinUser implements Observer
// 微信用户名
private String name;
public WeixinUser(String name)
this.name = name;
@Override
public void update(String message)
System.out.println(name + "-" + message);
//被观察者实例
public class SubscriptionObservable implements Observable
//储存订阅公众号的微信用户
private List<Observer> weixinUserlist = new ArrayList<Observer>();
@Override
public void attach(Observer observer)
weixinUserlist.add(observer);
@Override
public void detach(Observer observer)
weixinUserlist.remove(observer);
@Override
public void notify(String message)
for (Observer observer : weixinUserlist)
observer.update(message);
/***************** 以下是调用方式 *****************/
public void test()
SubscriptionObservable mSubscriptionObservable = new
SubscriptionObservable();
//创建微信用户
WeixinUser user1=new WeixinUser("a");
WeixinUser user2=new WeixinUser("b");
WeixinUser user3=new WeixinUser("c");
//订阅公众号
mSubscriptionObservable.attach(user1);
mSubscriptionObservable.attach(user2);
mSubscriptionObservable.attach(user3);
//被观察者更新发出消息给订阅的微信用户
mSubscriptionObservable.notify("信息更新了");
传统的观察者模式中,有多个的观察者,一个被观察者,并且事件的变化是由被观察者自身产生的。
RxJava源码中,有至少一个的被观察者(每一个操作符都会产生一个被观察者),一个观察者。链式的写法上,为了统一结构,用了被观察者订阅观察者的写法(实际上,仍是观察者订阅被观察者)。
怎样将传统的观察者模式变形为RxJava中用到的观察者模式呢?
创建抽象的观察者与被观察者,结构如下:
//观察者抽象接口
public interface Observer<T>
void onSubscribe();
void onNext(T t);
void onError(Throwable throwable);
void onComplete();
//被观察者抽象接口 -- 因为是一对一关系,此处不需要列表存储,subscribe表示订阅关系
public interface ObservableSource<T>
void subscribe(Observer<T> observer);
其中Observer,仿造RxJava的观察者结构,onSubscribe在建立订阅关系时,被调用。
ObservableSource,就是被观察者的结构,此处只定义了subscribe操作(订阅)。
RxJava最精华的地方,在于被观察者的操作符转换(异步等操作都在此处实现),那么它是如何实现的呢?这里就涉及到另一种设计模式:装饰者模式。
RxJava的每一个操作符调用后,都会产生一个新的被观察者。这个新的被观察者的持有上游最近的被观察者以及观察者引用。这种做法也是装饰者模式最直观的应用。
先看看装饰者模式:
可以参考该文章:https://www.cnblogs.com/of-fanruice/p/11565679.html ,该文章也是以 header first java 系列中的咖啡作为例子。
为什么要用装饰者模式,来生产各种操作符返回的被观察者?为什么不使用建造者模式?
装饰者模式最大的好处是对各种类型的被观察者之间解耦,RxJava目前已知的操作符就已经有上百种,如果使用单纯的类继承方式来生成被观察者,那么类爆炸是肯定的,而且随着操作符的增多,操作符之间的组合操作将越来越难以实现。此外类继承还会导致下游的被观察者,能感知到上游处,除了最近被观察者外的其他被观察者信息,这与我们的初衷是不符合的。
链式调用,第一反应,可能就是使用建造者模式,此处不用的原因也是因为这样会导致下游的被观察者,能感知到上游处,除了最近被观察者外的其他被观察者信息
那么,我们如何利用装饰者模式,生成我们需要的被观察者呢?
装饰者模式,类图如下:
二、根据装饰者模式类图,生成我们需要的被观察者
如上类图,我们将定义的基础被观察者接口和扩展与之一一对应。
2-1 最基础的被观察者接口,前面已经定义:ObservableSource。--------->对应Component。
2-2 抽象类Observable,实现ObservableSource接口。并作为对外的工具类使用。
2-3 抽象类ObservableBase,实现ObservableSource接口。--------->对应Decorator。
2-4 操作符ObservableCreate,实现Observable接口,实例化。--------->对应ConcreteDecoratorA。
2-5 操作符ObservableMap,实现ObservableBase接口,实例化。--------->对应ConcreteDecoratorB。
.......后续的操作符如异步操作符等,也对应类似的ConcreteDecoratorC、ConcreteDecoratorD等。
//part 1
public abstract class Observable<T> implements ObservableSource<T>
@Override
public void subscribe(Observer<T> observer)
realSubscribe(observer);
protected abstract void realSubscribe(Observer<T> observer);
public static<T> Observable<T> create(ObservableOnSubscribe<T> observableOnSubscribe )
return new ObservableCreate<>(observableOnSubscribe);
public <R> Observable<R> map(FunctionMap<T,R> functionMap)
return new ObservableMap<>(this,functionMap);
public Observable<T> observeOn()
return new ObservableObserveOn<>(this);
public Observable<T> scribeOn()
return new ObservableSubscribeOn<>(this);
//part 2
public abstract class ObservableBase<T,U> extends Observable<U>
final Observable<T> source;
public ObservableBase(Observable<T> source)
this.source = source;
//part 3
public class ObservableCreate<T> extends Observable<T>
final ObservableOnSubscribe<T> observableOnSubscribe;
public ObservableCreate(ObservableOnSubscribe<T> observableOnSubscribe)
this.observableOnSubscribe = observableOnSubscribe;
@Override
protected void realSubscribe(Observer<T> observer)
observer.onSubscribe();
observableOnSubscribe.subscribe(observer);
final static class ObserverOnCreate<T> extends ObserverBase<T>
final Observer<T> observer;
public ObserverOnCreate(Observer<T> observer)
this.observer = observer;
@Override
public void onNext(T t)
observer.onNext(t);
@Override
public void onError(Throwable throwable)
observer.onError(throwable);
@Override
public void onComplete()
observer.onComplete();
//part 4
public class ObservableMap<T, U> extends ObservableBase<T,U>
final FunctionMap<T, U> functionMap;
public ObservableMap(Observable<T> source, FunctionMap<T, U> functionMap)
super(source);
this.functionMap = functionMap;
@Override
protected void realSubscribe(Observer<U> observer)
observer.onSubscribe();
source.subscribe(new MapObserver<>(observer, functionMap));
final static class MapObserver<T, U> extends ObserverBase<T>
final Observer<U> observer;
final FunctionMap<T, U> functionMap;
public MapObserver(Observer<U> observer, FunctionMap<T, U> functionMap)
this.observer = observer;
this.functionMap = functionMap;
@Override
public void onNext(T t)
U u = functionMap.apply(t);
observer.onNext(u);
@Override
public void onError(Throwable throwable)
observer.onError(throwable);
@Override
public void onComplete()
observer.onComplete();
三、特殊的操作符-异步的实现
异步的实现,其实与通常的操作符实现类似。只是在其中加入了线程的切换。以下是让观察者回调运行在主线程的例子(android中view需要更新在主线程,该操作是常用操作)。
让上游运行在子线程中,其实也是类似,可将回调放置在子线程或者线程池中调用,此处就不贴出。
//此处是个简单例子,让观察者的回调运行在主线程(除onSubscribe外)
public class ObservableObserveOn<T> extends ObservableBase<T, T>
public ObservableObserveOn(Observable<T> source)
super(source);
@Override
protected void realSubscribe(Observer<T> observer)
observer.onSubscribe();
source.subscribe(new ObserverOnObserveOn<>(observer));
final static class ObserverOnObserveOn<T> extends ObserverBase<T>
final android.os.Handler handler;
final Observer<T> observer;
public ObserverOnObserveOn(Observer<T> observer)
this.observer = observer;
handler = new android.os.Handler(Looper.getMainLooper());
@Override
public void onNext(T t)
handler.post(new Runnable()
@Override
public void run()
observer.onNext(t);
);
@Override
public void onError(Throwable throwable)
handler.post(new Runnable()
@Override
public void run()
observer.onError(throwable);
);
@Override
public void onComplete()
handler.post(new Runnable()
@Override
public void run()
observer.onComplete();
);
四、总结
按上述的方式,完成了RxJava的主要脉络和思想的重写。上面讲述的更多的是一种思想,利用观察者模式的变种和装饰者模式的使用,构建一个RxJava的主体。该做法也是目前RxJava 2的做法。
demo地址:https://github.com/yangzhaomuma/RxJava
以上是关于框架手写系列---RxJava之从零开始的主要内容,如果未能解决你的问题,请参考以下文章