RxJava源码分析
Posted showCar
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RxJava源码分析相关的知识,希望对你有一定的参考价值。
又好长一段时间没写博客了。今天来个较简短的博客来强势回归哈哈。上篇博客写了RN的源码分析,出乎意料的就有两个出版社联系我要不要出书,可见RN的火热。本来也想写RN的,但因为最近接触了挺多android的东东,还是想先总结总结先。RN就放后面啦。
什么是RxJava
接触RxJava也一段时间了,一直想写下关于它的文章,RxJava是用来实现异步框架的,类似于AsyncTask。这里推荐看下给Android开发者的RxJava详解这篇文章,写得很好。
RxJava能够很简洁的完成异步操作,在逻辑很复杂的情况下,RxJava的优势就很明显。
RxJava的使用
先看下RxJava完成网络请求的代码:
Observable.create(new Observable.OnSubscribe<Integer>()
@Override
public void call(Subscriber<? super Integer> subscriber)
Log.e("rxjava","observable call");
subscriber.onNext(1);
).map(new Func1<Integer, String>()
@Override
public String call(Integer integer)
return "===>" + integer;
).subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Action1<String>()
@Override
public void call(String s)
);
链式编程风格。简单讲下上面代码。
* Observable:被观察者,有变化时通知观察者。
* Observer:观察者,监测被观察者,有变化就做相应的触发。
* OnSubscribe:Observable中的成员变量。
首先OnSubscribe的call方法中实现业务逻辑,如网络请求等,可以在子线程中执行。subscribeOn(Schedulers.io())就会在内部实现是是用一个无数量上限的线程池,在线程中实现相应操作。observeOn(AndroidSchedulers.mainThread())用于指定观察者的执行环境,这里指定的是主纯程。执行非常简单吧。
在这里给出我自己写的Demo地址,使用RxJava和Retrofit一起进行网络请求。Demo中也对RxJava与Retrofit做了封装。可以看下。
代码下载
源码分析
进入本篇博客的重点。讲下RxJava的源码。主要是了解下它的思想,很值得学习。以下面那段代码进行分析。
Observable.create(new Observable.OnSubscribe<Integer>()
@Override
public void call(Subscriber<? super Integer> subscriber)
Log.e("rxjava","observable call");
subscriber.onNext(1);
).map(new Func1<Integer, String>()
@Override
public String call(Integer integer)
return "===>" + integer;
).subscribe(new Action1<String>()
@Override
public void call(String s)
);
首先看create:
public static <T> Observable<T> create(OnSubscribe<T> f)
return new Observable<T>(hook.onCreate(f));
这里的f就是我们定义的OnSubscribe。这里简称On1。hook.onCreate(f)的代码很简单如下:
public <T> OnSubscribe<T> onCreate(OnSubscribe<T> f)
return f;
这里只是直接把f返回,而f在这里是On1。然后就新建一了个被观察者,这里简称Ob1。看下Observable的构造方法:
protected Observable(OnSubscribe<T> f)
this.onSubscribe = f;
直接把On1赋给 Ob1里的onSubscribe变量。即Ob1.onSubscribe就是On1。记住啦。接下来,看下Map的代码,从上面分析可知,它是调用的Ob1.map()。
public final <R> Observable<R> map(Func1<? super T, ? extends R> func)
return lift(new OperatorMap<T, R>(func));
这里实例化了OperatorMap,并将Func1作为参数传入。
public final class OperatorMap<T, R> implements Operator<R, T>
final Func1<? super T, ? extends R> transformer;
public OperatorMap(Func1<? super T, ? extends R> transformer)
this.transformer = transformer;
@Override
public Subscriber<? super T> call(final Subscriber<? super R> o)
MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
o.add(parent);
return parent;
static final class MapSubscriber<T, R> extends Subscriber<T>
final Subscriber<? super R> actual;
final Func1<? super T, ? extends R> mapper;
boolean done;
public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper)
this.actual = actual;
this.mapper = mapper;
@Override
public void onNext(T t)
R result;
try
result = mapper.call(t);
catch (Throwable ex)
Exceptions.throwIfFatal(ex);
unsubscribe();
onError(OnErrorThrowable.addValueAsLastCause(ex, t));
return;
actual.onNext(result);
其中transformer就是存的Func1。看下看下lift的代码:
public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator)
return new Observable<R>(new OnSubscribeLift<T, R>(onSubscribe, operator));
回顾下这里的operator就是上面的OperatorMap,其中transformer就是我们定义的Func1。主要是调用OnSubscribeLift。
public final class OnSubscribeLift<T, R> implements OnSubscribe<R>
static final RxJavaObservableExecutionHook hook = RxJavaPlugins.getInstance().getObservableExecutionHook();
final OnSubscribe<T> parent;
final Operator<? extends R, ? super T> operator;
public OnSubscribeLift(OnSubscribe<T> parent, Operator<? extends R, ? super T> operator)
this.parent = parent;
this.operator = operator;
@Override
public void call(Subscriber<? super R> o)
try
Subscriber<? super T> st = hook.onLift(operator).call(o);
try
// new Subscriber created and being subscribed with so 'onStart' it
st.onStart();
parent.call(st);
catch (Throwable e)
// localized capture of errors rather than it skipping all operators
// and ending up in the try/catch of the subscribe method which then
// prevents onErrorResumeNext and other similar approaches to error handling
Exceptions.throwIfFatal(e);
st.onError(e);
catch (Throwable e)
Exceptions.throwIfFatal(e);
// if the lift function failed all we can do is pass the error to the final Subscriber
// as we don't have the operator available to us
o.onError(e);
首先先说明,这个类很重要。这里parent就是On1,operator就是OperatorMap。很好理解,我们先定义的On1再定义的Func1,所以叫On1为parent,好记。这里我们把生成的OnSubscribeList类简称为lift。多啰嗦下,lift的的parent是On1,operator是OperatorMap(存着Func1),记住,调用是在.subscribe方法中,这此之前,这些call方法都还不会运行。那什么时候运行呢。别急,让我们来看看subscribe方法。上面最后又生成了一个Observable。我们叫它Ob2。
protected Observable(OnSubscribe<T> f)
this.onSubscribe = f;
可知,它把lift作为参数存入了Ob2的onSubscribe中。所以Ob2.onSubscribe=lift。好终于到最后的subscribe了,来看看它的源码:
public final Subscription subscribe(final Action1<? super T> onNext)
if (onNext == null)
throw new IllegalArgumentException("onNext can not be null");
Action1<Throwable> onError = InternalObservableUtils.ERROR_NOT_IMPLEMENTED;
Action0 onCompleted = Actions.empty();
return subscribe(new ActionSubscriber<T>(onNext, onError, onCompleted));
简单,主要是生成了ActionSubscriber。这里OnNext就是我们定义的Action1,好,把Action1作为参数传到ActionSubscriber生成一个Subscriber,我们把生成的这个Subscriber称为Sub1。看下它的代码:
public final class ActionSubscriber<T> extends Subscriber<T>
final Action1<? super T> onNext;
final Action1<Throwable> onError;
final Action0 onCompleted;
public ActionSubscriber(Action1<? super T> onNext, Action1<Throwable> onError, Action0 onCompleted)
this.onNext = onNext;
this.onError = onError;
this.onCompleted = onCompleted;
@Override
public void onNext(T t)
onNext.call(t);
@Override
public void onError(Throwable e)
onError.call(e);
@Override
public void onCompleted()
onCompleted.call();
没啥,但从这我们就知道,如果调用sub1的onNext,就是调用Action1的call方法。继续跟subscribe方法,它把sub1作为参数传了进去。
static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable)
// validate and proceed
if (subscriber == null)
throw new IllegalArgumentException("observer can not be null");
if (observable.onSubscribe == null)
throw new IllegalStateException("onSubscribe function can not be null.");
/*
* the subscribe function can also be overridden but generally that's not the appropriate approach
* so I won't mention that in the exception
*/
// new Subscriber so onStart it
subscriber.onStart();
/*
* See https://github.com/ReactiveX/RxJava/issues/216 for discussion on "Guideline 6.4: Protect calls
* to user code from within an Observer"
*/
// if not already wrapped
if (!(subscriber instanceof SafeSubscriber))
// assign to `observer` so we return the protected version
subscriber = new SafeSubscriber<T>(subscriber);
// The code below is exactly the same an unsafeSubscribe but not used because it would
// add a significant depth to already huge call stacks.
try
// allow the hook to intercept and/or decorate
hook.onSubscribeStart(observable, observable.onSubscribe).call(subscriber);
return hook.onSubscribeReturn(subscriber);
catch (Throwable e)
// special handling for certain Throwable/Error/Exception types
Exceptions.throwIfFatal(e);
// in case the subscriber can't listen to exceptions anymore
if (subscriber.isUnsubscribed())
RxJavaPluginUtils.handleException(hook.onSubscribeError(e));
else
// if an unhandled error occurs executing the onSubscribe we will propagate it
try
subscriber.onError(hook.onSubscribeError(e));
catch (Throwable e2)
Exceptions.throwIfFatal(e2);
// if this happens it means the onError itself failed (perhaps an invalid function implementation)
// so we are unable to propagate the error correctly and will just throw
RuntimeException r = new OnErrorFailedException("Error occurred attempting to subscribe [" + e.getMessage() + "] and then again while trying to pass to onError.", e2);
// TODO could the hook be the cause of the error in the on error handling.
hook.onSubscribeError(r);
// TODO why aren't we throwing the hook's return value.
throw r;
return Subscriptions.unsubscribed();
这里的subscriber就是上面的sub1。主要调用onSubscribeStart,它的两个参数,observable其实就是Ob2,Ob2.onSubscribe当然知道是谁啦,就是lift。看看这个方法的实现:
public <T> OnSubscribe<T> onSubscribeStart(Observable<? extends T> observableInstance, final OnSubscribe<T> onSubscribe)
// pass through by default
return onSubscribe;
OK,返回onSubscribe。那就是lift,则上面的代码等价于:
lift.call(sub1)
看OnSubscribeLift的call方法。
@Override
public void call(Subscriber<? super R> o)
try
Subscriber<? super T> st = hook.onLift(operator).call(o);
try
// new Subscriber created and being subscribed with so 'onStart' it
st.onStart();
parent.call(st);
catch (Throwable e)
// localized capture of errors rather than it skipping all operators
// and ending up in the try/catch of the subscribe method which then
// prevents onErrorResumeNext and other similar approaches to error handling
Exceptions.throwIfFatal(e);
st.onError(e);
catch (Throwable e)
Exceptions.throwIfFatal(e);
// if the lift function failed all we can do is pass the error to the final Subscriber
// as we don't have the operator available to us
o.onError(e);
o是sub1,list中的operator是operatorMap,parent是On1,这个我们上面己经分析过,忘记的可以回去看看。看下hook.OnLift方法:
public <T, R> Operator<? extends R, ? super T> onLift(final Operator<? extends R, ? super T> lift)
return lift;
lift是传进来的operatorMap,onLift返回lift。所以相当于是调用了operatorMap.call(sub1),看它OperatorMap中的call方法:
public Subscriber<? super T> call(final Subscriber<? super R> o)
MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
o.add(parent);
return parent;
这里的transformer就是Func1,创建了一个MapSubscriber,这里我们叫它Sub2。o是Sub1。看下MapSubscriber的代码:
static final class MapSubscriber<T, R> extends Subscriber<T>
final Subscriber<? super R> actual;
final Func1<? super T, ? extends R> mapper;
boolean done;
public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper)
this.actual = actual;
this.mapper = mapper;
@Override
public void onNext(T t)
R result;
try
result = mapper.call(t);
catch (Throwable ex)
Exceptions.throwIfFatal(ex);
unsubscribe();
onError(OnErrorThrowable.addValueAsLastCause(ex, t));
return;
actual.onNext(result);
@Override
public void onError(Throwable e)
if (done)
RxJavaPluginUtils.handleException(e);
return;
done = true;
actual.onError(e);
@Override
public void onCompleted()
if (done)
return;
actual.onCompleted();
@Override
public void setProducer(Producer p)
actual.setProducer(p);
Ok,那么actual就是Sub1,mapper就是transformer,其实就是Func1,记住啦。回到OnSubscribeLift中的代码,接下来调用以下代码:
parent.call(st);
好,st就是我们创建的Sub2。parent就是On1。那么调用On1.call方法就回到了我们自定义的方法中:
public void call(Subscriber<? super Integer> subscriber)
Log.e("rxjava","observable call");
subscriber.onNext(1);
在例子中,我们调用的就是Sub2的onNext方法。看下Sub2的onNext方法,在MapSubscriber中:
@Override
public void onNext(T t)
R result;
try
result = mapper.call(t);
catch (Throwable ex)
Exceptions.throwIfFatal(ex);
unsubscribe();
onError(OnErrorThrowable.addValueAsLastCause(ex, t));
return;
actual.onNext(result);
oK,那result就为“===>1”。最后调用actual的onNext方法。其实就是Sub1的onNext方法,这在上面己经说过,调用的就是Action的call,如下:
@Override
public void onNext(T t)
onNext.call(t);
那最后就回到我们定义的Action的call方法了,t为“===>1”。
new Action1<String>()
@Override
public void call(String s)
这样调用就结束了。
总结
上面流程看起来有点复杂,先看下面这张图:
个人把它分为三个步骤。
- 从左边开始,从上到下每一次lift都会新建一个Observable,一个OnSubscribe和Operator从上面分析过程中,lift就是OnSubscribe。而opeatorMap就是这里的Operator。在没有调用subscribe方法之前,它们的call是不会调用的。直到调用到subscribe。
- 从下到上,创建Suscriber,每创建一个就调用OnSubscribe的call方法,接着调用Operator的Call方法,层层往上。之所以能层层往上是因为Suscriber中有parent变量。直到调用On1中的方法。
- 第三阶段,从上到下调用各个Subscriber,如这个例子就是先On1再Func1再Action1。从而完成整个过程 。
篇幅关系,没有分析RxJava的线程切换流程。但这里说明下,线程切换跟上文分析的一样,同样是用Lift来切换。被观察者的创建的对象OperatorSubscribeOn是一个OnSubscribe,而观察者创建的OperatorObserveOn对象是一个Operator。所以它们同样是执行上面的那些流程。之前subscribeOn只有一次有效,是因为上面第二阶段一直往上但又没执行,直到第一个声明subscribeOn之后再开始调用On1,所以明显只有一个有效。而之所以observeOn每个都有它之上的代码有效,是因为第三阶段往下执行,每执行完Subscriber如遇observeOn就会线程切换。所以是每次有效。
以上是关于RxJava源码分析的主要内容,如果未能解决你的问题,请参考以下文章
RxJava之七——RxJava 2.0 图文分析create() subscribe()map()observeOn()subscribeOn()源码