RxJava前奏之原理分析

Posted Alex_MaHao

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RxJava前奏之原理分析相关的知识,希望对你有一定的参考价值。

RxJava 之前奏:原理分析

前言:RxJava在国内越来越火,本文只是原理分析的总结。代码不多,但有些绕,一步一步的推进,对于理解RxJava来说还是十分有好处的。但确实需要很大的毅力去坚持下去看完。

首先我们进入一个例子,关于猫的例子。

我们有个 Web API,能根据给定的查询请求搜索到整个互联网上猫的图片。每个图片包含可爱指数的参数(描述图片可爱度的整型值)。我们的任务将会下载到一个猫列表的集合,选择最可爱的那个,然后把它保存到本地。

首先定义实体类

public class Cat implements Comparable<Cat> {
    /**
     * 图片
     */
    String image;

    /**
     * 喜爱度
     */
    int cuteness;

    @Override
    public int compareTo(Cat another) {
        return Integer.compare(cuteness, another.cuteness);
    }

}

定义一个Uri类,用来定义图片保存的地址

public class Uri {

}

定义Api接口类,主要查询猫和保存猫的方法。

public interface Api {

    /**
     * 查询所有猫
     * @param query
     * @return
     */
    List<Cat> queryCats(String query);

    /**
     * 保存
     * @param cat
     * @return
     */
    Uri store(Cat cat);

}

定义业务类CastsHelp

public class CastsHelp {
    Api api;

    //保存猫
    public Uri saveTheCutestCat(String query) {
        List<Cat> queryCats = api.queryCats(query);
        Cat cat = findCustest(queryCats);

        Uri savedUri = api.store(cat);
        return savedUri;
    }

    //查询最受喜爱的猫
    private Cat findCustest(List<Cat> queryCats) {
        return Collections.max(queryCats);
    }
}

ok,我们的第一个例子写完了,让我们回过头来再仔细看一下这个例子,该业务逻辑类CastsHelp,里面包含了两个方法,其中saveTheCutestCat方法里面,包含了三个组合的方法,最终获取到保存猫的地址,并返回。这种组合方式即简单,又直白。一眼看去,清晰,明了。但我们往下看。

异步执行

我们知道,对于一些网络请求,我们通常都需要进行异步去执行,那么,对于刚才说saveTheCutestCat方法,我们就不能如此写,对于api的中的方法,我们需要利用接口回调的方式回传结果。于是我们修改我们的代码。

public interface Api {


    interface CatsQueryCallback{
        void onCatListReceived(List<Cat> cats);
        void onCatsQueryError(Exception e);
    }


    interface StoreCallback{
        void onCateStored(Uri uri);
        void onStoteFailed(Exception e);
    }
    /**
     * 查询所有猫
     * @param query
     * @return
     */
    List<Cat> queryCats(String query,CatsQueryCallback castQueryCallback);


    /**
     * 保存
     * @param cat
     * @return
     */
    Uri store(Cat cat,StoreCallback storeCallback);

}

修改了一下api中的方法,因为对于猫和保存猫,分别为网络请求,和I/O操作,我们不能进行同步操纵,所以添加了两个接口,分别查询猫的和保存猫地址的结果的回调。

同时我们需要修改一下我们的业务逻辑类CastsHelp,也需要添加接口回调。

public interface CutestCatCallback{
        void onCutestCatSaved(Uri uri);

        void onCutestCatFail(Exception e);
    }

    Api api;

    public void saveTheCutestCat(String query,final CutestCatCallback custestCatCallback) {
        List<Cat> queryCats = api.queryCats(query,new CatsQueryCallback() {

            @Override
            public void onCatsQueryError(Exception e) {
                //查询猫失败
                custestCatCallback.onCutestCatFail(e);
            }


            @Override
            public void onCatListReceived(List<Cat> cats) {
                Cat cat = findCustest(cats);

                api.store(cat, new StoreCallback() {

                    @Override
                    public void onStoteFailed(Exception e) {
                        //保存猫失败
                        custestCatCallback.onCutestCatFail(e);

                    }

                    @Override
                    public void onCateStored(Uri uri) {
                        custestCatCallback.onCutestCatSaved(uri);
                    }
                });
            }
        });

    }

    private Cat findCustest(List<Cat> queryCats) {
        return Collections.max(queryCats);
    }

代码开始复杂了,我们再来分析一下代码,在CastsHelp的saveTheCutestCat方法,我们传入了一个url,以及结果的回调,及该方法调用者用来实现的接口回调。
- 首先,调用api的queryCats方法查询所有的猫,因为该方法是异步的,所以我们传入一个接口的实现类,该接口包含了两个回调方法,onCatsQueryError()查询失败的方法,在此方法中,我们直接调用调用者实现的接口告诉调用者获取信息出现异常,onCatListReceived(List cats)方法,表明查询猫成功,并返回猫列表。
- 我们在onCatListReceived(List cats)方法内,此时查询猫列表的数据已经返回,我们继续调用了获取最喜爱的猫方法findCustest,然后保存猫。store()方法又包含了两个回调方法,在onStoteFailed()方法中,依然如上。在成功的方法onCateStored()中,我们将获取到的值通过调用者的实现接口进行回传给调用者。

好了,我们完成了异步任务的处理,代码复杂度大大的提升了,而且对于这么多接口回调,看着都心理难受。

接口回调的封装

我们加入异步任务之后,整个程序在逻辑上的复杂度,代码的复杂度上都大大的提高了,但我们发现该接口有些共同点,我们可以进行封装。
首先,先把之前添加的接口放在一起类比一下:

    //业务逻辑类
    public interface CutestCatCallback{
        void onCutestCatSaved(Uri uri);

        void onCutestCatFail(Exception e);
    }


    //查询猫
    interface CatsQueryCallback{
        void onCatListReceived(List<Cat> cats);
        void onCatsQueryError(Exception e);
    }

    //保存猫
    interface StoreCallback{
        void onCateStored(Uri uri);
        void onStoteFailed(Exception e);
    }

看到三个接口,我们会发现,他们都有一个成功的回调和一个失败的回调,失败的回调参数一样,而成功的回调接口不一样。那么我们来抽取一下,抽取了一个如下的新接口回调:

public interface Callback<T> {

    void onResult(T result);

    void onError(Exception e);

}

新的接口回调,比较简单,成功和失败的方法,添加了泛型,那么我们把它移入项目中,这样关于我们的api接口又有所变动:

public interface Api {
    /**
     * 查询所有猫
     * @param query
     * @return
     */
    void queryCats(String query,Callback castQueryCallback);


    /**
     * 保存
     * @param cat
     * @return
     */
    void store(Cat cat,Callback storeCallback);

}

在看一下我们的业务逻辑类

public class CastsHelp {

    Api api;

    public void saveTheCutestCat(String query,final Callback<Uri> custestCatCallback) {
        api.queryCats(query, new Callback<List<Cat>>() {

            @Override
            public void onResult(List<Cat> result) {
                Cat cat = findCustest(result);
                api.store(cat, new Callback<Uri>() {

                    @Override
                    public void onResult(Uri result) {
                        custestCatCallback.onResult(result);
                    }

                    @Override
                    public void onError(Exception e) {
                        custestCatCallback.onError(e);
                    }
                });
            }

            @Override
            public void onError(Exception e) {
                custestCatCallback.onError(e);

            }
        });

    }

    private Cat findCustest(List<Cat> queryCats) {
        return Collections.max(queryCats);
    }

逻辑上本有太大的变动,但在代码上,我们将之前定义的三个接口,统一改成了我们新定义的Callback接口。代码简洁了一些,同时也便于理解了一些。

再次进阶

在上面我们的Demo中,我们通过定义一个泛型回调,大大降低了代码量,但还是存在一个问题,及我们每次调用这个方法,都需要传入两个参数,分别为url和接口回调。如果,我们能不能只请求一个参数url,同时返回一个对象,该对象包含了我们所需要的回调信息。

那么我们在思考一下,我们需要的是一个对象,该对象包含了一个只有接口回调为参数的方法,我们称之为AsyncJob。

public abstract class AsyncJob<T> {

    public abstract void start(Callback<T> callback);

}

我们让查询的方法等的返回值为该对象,则返回的该对象中调用start方法即可开始执行回调。

因为我们的Api中的两个方法是通过接口回掉来进行的,所以我们需要对Api类进行包装。ApiWrapper.

/**
 * 接口的包装类 备注人: Alex_MaHao
 * 
 * @date 创建时间:2016年2月26日 上午10:11:28
 */
public class ApiWrapper {
    Api api;

    public AsyncJob<List<Cat>> queryCats(final String query) {
        return new AsyncJob<List<Cat>>() {

            @Override
            public void start(final Callback<List<Cat>> callback) {
                api.queryCats(query, new Callback<List<Cat>>() {

                    @Override
                    public void onResult(List<Cat> result) {
                        callback.onResult(result);

                    }

                    @Override
                    public void onError(Exception e) {
                        callback.onError(e);

                    }
                });
            }
        };

    }


    public AsyncJob<Uri> store(final Cat cat) {
        return new AsyncJob<Uri>() {

            @Override
            public void start(final Callback<Uri> callback) {
                api.store(cat, new Callback<Uri>() {

                    @Override
                    public void onResult(Uri result) {
                        callback.onResult(result);
                    }

                    @Override
                    public void onError(Exception e) {
                        callback.onError(e);
                    }
                });
            }

        };
    }

}

我们看一下我们的ApiWrapper中的方法,因为两个方法比较类似,我们只看queryCats方法,理解一下即可。如果单纯的看逻辑,会发现比较绕,我们看一下第一个方法应该怎么使用。

        //调用方法,返回包含我们回调信息的asyncJob类
        AsyncJob<List<Cat>> async = new ApiWrapper().queryCats("query");

        //启动回调方法,获取回调结果
        async.start(new Callback<List<Cat>>() {

            @Override
            public void onResult(List<Cat> result) {
                //所有猫的集合
            }

            @Override
            public void onError(Exception e) {
                //错误回调
            }
        });

可以看到,我们的queryCats()传入一个url地址,返回一个包含我们回调接口的对象,这时候,我们只需要调用start()方法,即可获取到我们的回调结果。

那么,同时我们也需要修改一下CastsHelp中的代码

public class CastsHelp {

    ApiWrapper apiWrapper;

    public AsyncJob<Uri> saveTheCutestCat(final String query) {
        return new AsyncJob<Uri>() {

            @Override
            public void start(final Callback<Uri> callback) {
                apiWrapper.queryCats(query).start(new Callback<List<Cat>>() {

                    @Override
                    public void onResult(List<Cat> result) {
                        Cat cat = findCustest(result);

                        apiWrapper.store(cat).start(new Callback<Uri>() {

                            @Override
                            public void onResult(Uri result) {
                                callback.onResult(result);
                            }

                            @Override
                            public void onError(Exception e) {
                                callback.onError(e);
                            }
                        });
                    }

                    @Override
                    public void onError(Exception e) {
                        callback.onError(e);
                    }
                });;
            }
        };

    }

    private Cat findCustest(List<Cat> queryCats) {
        return Collections.max(queryCats);
    }
}

该方法也只是把我们的方法的返回值对象做了一下转换,修改了其返回值对象为我们的AsyncJob。

我们把该方法在进行整理,于是就有了如下代码

public class CastsHelp {

    ApiWrapper apiWrapper;

    public AsyncJob<Uri> saveTheCutestCat(final String query) {

        //猫列表数据的接口回调
        final AsyncJob<List<Cat>> catsListAsyncJob = apiWrapper
                .queryCats(query);

        //查询最喜爱的猫
        final AsyncJob<Cat> cutestCatAsyncJob = new AsyncJob<Cat>() {

            @Override
            public void start(final Callback<Cat> callback) {
                catsListAsyncJob.start(new Callback<List<Cat>>() {

                    @Override
                    public void onResult(List<Cat> result) {
                        callback.onResult(findCustest(result));

                    }

                    @Override
                    public void onError(Exception e) {
                        callback.onError(e);
                    }
                });
            }
        };

        //获取保存的url地址的回掉对象并返回
        AsyncJob<Uri> storedUriAsyncJob = new AsyncJob<Uri>() {

            @Override
            public void start(final Callback<Uri> callback) {
                cutestCatAsyncJob.start(new Callback<Cat>() {

                    @Override
                    public void onResult(Cat result) {
                        apiWrapper.store(result).start(callback);
                    }

                    @Override
                    public void onError(Exception e) {
                        callback.onError(e);
                    }
                });
            }
        };

        return storedUriAsyncJob;

    }

    private Cat findCustest(List<Cat> queryCats) {
        return Collections.max(queryCats);
    }
}

map和flatMap

在我们的CastsHelp的saveTheCutestCat()方法中,有一个查询喜爱都最高的猫的列表,那么我们能不能这样理解呢,从输入和输出来看,我们期望将一个AsyncJob

public interface Func<T,R> {
    R call(T t);
}

该接口很简单,传入T,通过call方法返回R。

我们在AsyncJob中添加map方法,先看代码

public <R> AsyncJob<R> map(final Func<T,R> func){
        final AsyncJob<T> source = this;

        return new AsyncJob<R>(){

            @Override
            public void start(final Callback<R> callback) {
                source.start(new Callback<T>() {

                    @Override
                    public void onResult(T result) {
                        //调用者实现的call方法
                        R r = func.call(result);

                        callback.onResult(r);
                    }

                    @Override
                    public void onError(Exception e) {
                        callback.onError(e);
                    }
                });
            }

        };

    }

我们看一下这个方法,在之前我们这样分析,我们需要的是将AsyncJob

public  AsyncJob<Cat> map(final Func<List<Cat>,Cat> func){
}

ok,我们在深入的看一下map方法,可以看到我们返回的对象,无非就是将当前的AsyncJob对象的回调信息与我们返回的信息所绑定,同时在onResult方法中将数据类型进行转换。

那么,在CastsHelp中的查询最喜爱的猫的方法,可改成如下所示:

    final AsyncJob<Cat> cutestCatAsyncJob = catsListAsyncJob.map(new Func<List<Cat>, Cat>() {

            @Override
            public Cat call(List<Cat> t) {
                return findCustest(t);
            }
        });

map对象,在转换的时候是将一个对象转换为另一个对象,通过的是返回值的方式,业绩是同步的方式进行的,那么我们会不会有另一种情况呢,即我们的call方法返回的也是一个回调信息,及也是异步的呢。

这种情况也是存在的,我们看一下我们的保存猫的方法,该方法是一个异步的任务,他不能直接返回值,而是返回一个回调的信息,我们看一下保存猫的方法,public AsyncJob<Uri> store(final Cat cat),很明显,他不能向我们之前查询喜爱都最高的猫时直接获取Cat对象,并调用onResult方法并返回,他是一个回调对象,我们需要做的是将返回的回调对象和我们转换过后的相绑定。这时候我们定义了flatMap();

public <R> AsyncJob<R> flatMap(final Func<T, AsyncJob<R>> func ){
        final AsyncJob<T> source = this;
        return new AsyncJob<R>(){

            @Override
            public void start(final Callback<R> callback) {
                source.start(new Callback<T>() {

                    @Override
                    public void onResult(T result) {
                        AsyncJob<R> mapped = func.call(result);

                        mapped.start(callback);
                    }

                    @Override
                    public void onError(Exception e) {
                        callback.onError(e);

                    }
                });

            }

        };
    }

该方法将返回的Func回调对象与我们的flatMap对象绑定。那么,保存猫的方式也可以修改如下:

    AsyncJob<Uri> storedUriAsyncJob = cutestCatAsyncJob.flatMap(new Func<Cat, AsyncJob<Uri>>() {

            @Override
            public AsyncJob<Uri> call(Cat t) {
                // TODO Auto-generated method stub
                return apiWrapper.store(t);
            }
        });

最后,贴上CastHelps的完整代码,


    public AsyncJob<Uri> saveTheCutestCat(final String query) {

        //查询猫
        final AsyncJob<List<Cat>> catsListAsyncJob = apiWrapper
                .queryCats(query);

        //获取最喜爱的猫
        final AsyncJob<Cat> cutestCatAsyncJob = catsListAsyncJob.map(new Func<List<Cat>, Cat>() {

            @Override
            public Cat call(List<Cat> t) {
                return findCustest(t);
            }
        });

        //获取地址
        AsyncJob<Uri> storedUriAsyncJob = cutestCatAsyncJob.flatMap(new Func<Cat, AsyncJob<Uri>>() {

            @Override
            public AsyncJob<Uri> call(Cat t) {
                return apiWrapper.store(t);
            }
        });

        return storedUriAsyncJob;

    }
  • map和flatMap的总结
    • 相同点:都是通过Func对象的Call方法,让调用者来实现不同数据的对象转换
    • 不同点:Call方法转换的目标结果不同,导致对结果的处理不同。如果是同步的转换,不涉及接口回掉,异步等,map处理。如果Call方法返回的是一个回调对象,那么我们需要去绑定,所以使用flatMap.

细心的可以看到我们saveTheCutestCat方法还可以精简


    public AsyncJob<Uri> saveTheCutestCat(final String query) {

        return apiWrapper
                .queryCats(query)
                .map(new Func<List<Cat>, Cat>() {

                    @Override
                    public Cat call(List<Cat> t) {


                        return findCustest(t);
                    }
                }).flatMap(new Func<Cat, AsyncJob<Uri>>() {

                    @Override
                    public AsyncJob<Uri> call(Cat t) {

                        return apiWrapper.store(t);
                    }
                });

    }

RxJava 的引入

在引入RxJava 之前,我们之前写了那么多的代码势必要使用一下。那么怎么使用呢:

public static void main(String[] args) {
        CastsHelp castsHelp = new CastsHelp();
        castsHelp.saveTheCutestCat("ss").start(new Callback<Uri>() {

            @Override
            public void onResult(Uri result) {
                //Uri 地址对象
            }

            @Override
            public void onError(Exception e) {
                //异常
            }
        });
    }

ok,开始和RxJava进行类比。

AsyncJob类比Observable
CallBack类比Observer
start()类比 subscribe();

那么我们开始对我的类进行修改,在这之前需要导入rxjava的jar包

public class ApiWrapper {
    Api api;

    public Observable<List<Cat>> queryCats(final String query) {

        return Observable.create(new OnSubscribe<List<Cat>>() {

            @Override
            public void call(final Subscriber<? super List<Cat>> arg0) {
                api.queryCats(query, new CatsQueryCallback() {

                    @Override
                    public void onCatsQueryError(Exception e) {
                        arg0.onError(e);
                    }

                    @Override
                    public void onCatListReceived(List<Cat> cats) {
                        arg0.onNext(cats);
                    }
                });
            }
        });
    }

    public Observable<Uri> store(final Cat cat) {
        return Observable.create(new OnSubscribe<Uri>() {

            @Override
            public void call(final Subscriber<? super Uri> subscriber) {
                api.store(cat, new StoreCallback() {

                    @Override
                    public void onStoteFailed(Exception e) {
                        subscriber.onError(e);
                    }

                    @Override
                    public void onCateStored(Uri uri) {
                        subscriber.onNext(uri);
                    }
                });
            }
        });
    }

}

public class CastsHelp {

    ApiWrapper apiWrapper;

    public Observable<Uri> saveTheCutestCat(final String query) {
        return apiWrapper
                .queryCats(query)
                .map(new Func1<List<Cat>, Cat>() {

                        @Override
                        public Cat call(List<Cat> cats) {

                            return findCustest(cats);
                        }
                    })
                .flatMap(new Func1<Cat, Observable<Uri>>() {

                        @Override
                        public Observable<Uri> call(Cat cat) {

                            return apiWrapper.store(cat);
                        }
                    });

    }

    private Cat findCustest(List<Cat> queryCats) {
        return Collections.max(queryCats);
    }
}
public class Test {
    public static void main(String[] args) {
        CastsHelp castsHelp = new CastsHelp();
        castsHelp.saveTheCutestCat("ss").subscribe(new Observer<Uri>() {

            @Override
            public void onCompleted() {

            }

            @Override
            public void onError(Throwable arg0) {

            }

            @Override
            public void onNext(Uri arg0) {

            }
        });
    }
}

RxJava的原理已经理解和整理完毕。下面就开进入到了RxJava的整理与总结了。没多少代码,就不共享了。

长路漫漫啊。加油!!!





研究资料
NotRxJava懒人专用指南

以上是关于RxJava前奏之原理分析的主要内容,如果未能解决你的问题,请参考以下文章

Rxjava源码分析&实践实践环节:map操作符功能实现

一起来造一个RxJava,揭秘RxJava的实现原理

RxJava2线程切换原理分析

Hystrix系列之执行原理分析

RxJava2探索-线程切换原理之observeOn

RxJava2探索-线程切换原理之observeOn