Android RxJava/RxAndroid结合Retrofit使用

Posted 一口仨馍

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Android RxJava/RxAndroid结合Retrofit使用相关的知识,希望对你有一定的参考价值。

概述

RxJava是一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库。更重要的是:使用RxJava在代码逻辑上会非常简洁明了,尤其是在复杂的逻辑上。告别迷之缩进。

Rxandroid是RxJava针对Android平台的拓展。

Retrofit是一个封装了okHttp的工具库,在上篇博文 Android 初探Retrofit2.0.1(最新版) 有过介绍,对Retrofit不太了解的读者,建议先行阅读。

RxJava GitHub地址:https://github.com/ReactiveX/RxJava

RxAndroid GitHub地址:https://github.com/ReactiveX/RxAndroid

Retrofit GitHub地址:https://github.com/square/retrofit

热身运动 - 观察者模式

RxJava 的异步实现,是通过一种扩展的观察者模式来实现的。下面简单介绍下观察者模式,熟练掌握观察者模式可跳过这一小节。

观察者模式

假设现在有两个对象A和B,在A发生某种变化时要主动通知B。这就是观察者模式。Android里View.setOnClickListener(new OnClickListener) 就是运用观察者模式的典型例子。在这个例子中View充当对象A的角色,OnClickListener充当B。View通过setOnClickListener将自己和OnClickListener联系(订阅)起来。当View捕获到点击事件之后,立马调用OnClickListener#onClick() 方法。还有通常我们自己定义的接口回调都是观察者模式的运用。

RxJava的观察者模式

RxJava基本概念:Observable (被观察者,相当于View)、 Observer (观察者,相当于OnClickListener)、 subscribe ()(订阅,相当于setOnClickListener()方法)事件。Observable 和 Observer 通过 subscribe() 方法实现订阅关系,从而 Observable 可以在需要的时候发出事件来通知 Observer。

RxJava除了普通的回调方法onNext()还有onCompleted() 和 onError()。

  1. onCompleted():事件队列完结。RxJava 不仅把每个事件单独处理,还会把它们看做一个队列。没有新的onNext()之后,调用此方法。
  2. onError():事件队列异常。在事件处理过程中出异常时,onError() 会被触发,同时队列自动终止,不允许再有事件发出。
  3. onCompleted() 和 onError()在一个队列中只能调用一个,并且是最后一个。onCompleted() 和 onError()还是互斥的,只能调用其中一个

回顾Retrofit

上篇博文我们使用Retrofit实现对网络的访问以及返回数据的解析,详情请见Android 初探Retrofit2.0.1(最新版),在这里我们再简单温习下
1. 创建WeatherInfoService,并制定请求数据的方式以及需要的查询参数
2. 创建相应的WeatherInfoBean
3. 创建Retrofit对象并使用GSON解析数据
4. 调用 weatherInfoService#getWeatherInfo(FORMAT, CITYNAME, KEY),获取call
5. 插入队列,并展示数据

RxJava/RxAndroid结合Retrofit

添加依赖

compile 'com.squareup.retrofit2:retrofit:2.0.1'
compile 'com.squareup.retrofit2:converter-gson:2.0.0-beta4'
compile 'com.squareup.retrofit2:adapter-rxjava:2.0.1'
compile 'io.reactivex:rxandroid:1.1.0'
compile 'io.reactivex:rxjava:1.1.0'

添加限权

<uses-permission android:name="android.permission.INTERNET" />

再次见到熟悉的WeatherInfoService

public interface WeatherInfoService 

    @GET("http://v.juhe.cn/weather/index?format=2&cityname=北京&key=b952ad7acbc7415f3f3c9bf274e39c45")
    Observable<WeatherInfo> getWeatherInfoByRxJava();

注意这里getWeatherInfoByRxJava() 的返回类型为Observable<WeatherInfo>, 这也就意味着我们在这里直接得到一个被观察者Observable!

还是那个MainActivity

        findViewById(R.id.btn2).setOnClickListener(new View.OnClickListener() 
            @Override
            public void onClick(View v) 
                Gson gson = new GsonBuilder().create();
                Retrofit retrofit = new Retrofit.Builder()
                        .baseUrl(BASE_URL)
                        //配置转化库,默认是Gson
                        .addConverterFactory(GsonConverterFactory.create(gson))
                        //配置回调库,采用RxJava
                        .addCallAdapterFactory(RxJavaCallAdapterFactory.create())
                        .build();
                WeatherInfoService weatherInfoService = retrofit.create(WeatherInfoService.class);
                getWeatherInfo(weatherInfoService,gson);
                getWeatherInfoByMap(weatherInfoService,gson);
                getWeatherInfoByFlatMap(weatherInfoService,gson);
             
        );

注意.addCallAdapterFactory(RxJavaCallAdapterFactory.create()) ,这行代码的作用是配置Retrofit回调库,采用RxJava。
这里我们写了getWeatherInfo(weatherInfoService,gson);getWeatherInfoByMap(weatherInfoService,gson);getWeatherInfoByFlatMap(weatherInfoService,gson);三个方法。接下来会一一讲解

MainActivity#getWeatherInfo()

在这个方法里实现了最基本的RxJava/RxAndroid和Retrofit的结合。代码如下

        weatherInfoService.getWeatherInfoByRxJava()// 返回类型为Observable<WeatherInfo>
                .subscribeOn(Schedulers.io())// 指定订阅者在io线程(第一次指定订阅者线程有效)
                .doOnSubscribe(new Action0()  //doOnSubscribe线程为最近的subscribeOn指定线程
                    @Override
                    public void call() 
                        Toast.makeText(MainActivity.this, "我在网络请求前执行", Toast.LENGTH_SHORT).show();
                    
                )
                .subscribeOn(AndroidSchedulers.mainThread())// 指定在主线程
                .observeOn(AndroidSchedulers.mainThread())// 指定观察者在主线程
                .subscribe(new Subscriber<WeatherInfo>() 
                    @Override
                    public void onCompleted() 
                        Log.i(TAG, "onCompleted");
                    

                    @Override
                    public void onError(Throwable e) 
                        tv.setText("error:" + e.getMessage());
                    

                    @Override
                    public void onNext(WeatherInfo weatherInfo) 
                        Log.i(TAG, gson.toJson(weatherInfo));
                        tv.setText(gson.toJson(weatherInfo));
                    
                );

代码里注释很多,补充几个需要注意的地方。
1. subscribeOn()只有第一次调用的时候就指定被观察者Observable所在线程。以后可以多次调用,但被观察者Observable所在线程已经指定
2. doOnSubscribe()在发送事件前执行,可以指定执行线程。一般在里面展示loading
3. observeOn()可多次调用并且每次都会改变观察者Observer/Subscriber所在线程。

MainActivity#getWeatherInfoByMap()

Observable的map()是个神奇的方法,它可以对被观察者Observable的泛型进行操作,并且返回另一个Observable传递给观察者Observer/Subscriber

    private void getWeatherInfoByMap(WeatherInfoService weatherInfoService, final Gson gson) 
        weatherInfoService.getWeatherInfoByRxJava()// 返回类型为Observable<WeatherInfo>
                .subscribeOn(Schedulers.io())// 指定订阅者在io线程(第一次指定订阅者线程有效)
                .doOnSubscribe(new Action0()  //doOnSubscribe线程为最近的subscribeOn指定线程
                    @Override
                    public void call() 
                        Toast.makeText(MainActivity.this, "我在网络请求前执行", Toast.LENGTH_SHORT).show();
                    
                )
                .subscribeOn(AndroidSchedulers.mainThread())// 指定在主线程
                .map(new Func1<WeatherInfo, Today>() 
                    @Override
                    public Today call(WeatherInfo weatherInfo) 
                        return weatherInfo.getResult().getToday();
                    
                )
                .observeOn(AndroidSchedulers.mainThread())// 指定观察者在主线程
                .subscribe(new Subscriber<Today>() 
                    ...
                    @Override
                    public void onNext(Today today) 
                        tv.setText(gson.toJson(today));
                    
                );
    

在.map()方法中我们获取WeatherInfo中Today属性,并且返回Today。然后再观察者Subscriber我们就可以直接对Today进行操作。是不是很方便?还有更方便的!

MainActivity#getWeatherInfoByFlatMap()

使用.map方法只能返回一个值,属于一对一类型。RxJava给我们提供一个更神奇的方法.flatMap()。

    private void getWeatherInfoByFlatMap(WeatherInfoService weatherInfoService, final Gson gson) 
        weatherInfoService.getWeatherInfoByRxJava()
                .subscribeOn(Schedulers.io())
                .flatMap(new Func1<WeatherInfo, Observable<Future>>()

                    @Override
                    public Observable<Future> call(WeatherInfo weatherInfo) 
                        return Observable.from(weatherInfo.getResult().getFuture());
                    
                )
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<Future>() 
                    @Override
                    public void onCompleted() 
                        tv.setText(sb);
                    

                    @Override
                    public void onError(Throwable e) 

                    

                    @Override
                    public void onNext(Future future) 
                        Log.i(TAG,gson.toJson(future));
                       sb.append(gson.toJson(future)+"\\n");


                    
                );
    

这个方法有些难以理解,下面详细介绍关键部分代码。首先在.flatMap()中 第一个参数为被观察者Observable的泛型WeatherInfo,第二个参数定义为另一个被观察者,为了叙述方便,下文称第一个被观察者A,第二个参数即另一个被观察者称为B。A的泛型为WeatherInfo。B的泛型为Future。getFuture是这么被定义的

public class WeatherInfo 
    private String resultcode;
    private String reason;
    private String error_code;
    private Result result;
    ...


public class Result 

    private SK sk;
    private Today today;
    private List<Future> future;

    public List<Future> getFuture() 
        return future;
    
    ...

原来getFuture()返回的是个List<Future>,可是在Func1的call() 返回值怎么怎么是Observable<Future>?这是因为Observable.from()会将List<Future> 拆分成一个个的Future返回,也就是说订阅者的onNext 方法将会被执行List<Future>.seze() 次!所以这里我们定义了一个sb(StringBuilder),用于将每次返回的Future拼接起来,最后在onCompleted() 中调用tv.setText(sb);

结束语

至此,RxJava/RxAndroid结合Retrofit讲解完毕,希望能对读者有所帮助。感谢耐心读到最后!

源码下载地址:0分只为共享

参考:
http://gank.io/post/560e15be2dca930e00da1083
http://blog.csdn.net/liuhongwei123888/article/details/50375897

以上是关于Android RxJava/RxAndroid结合Retrofit使用的主要内容,如果未能解决你的问题,请参考以下文章

谁来讲讲Rxjava,rxandroid中的操作符的作用

谁来讲讲Rxjava,rxandroid中的操作符的作用

谁来讲讲Rxjava,rxandroid中的操作符的作用

Android 基于ijkplayer+Rxjava+Rxandroid+Retrofit2.0+MVP+Material Design的android万能播放器aaa

RxJava 和 RxAndroid 三(生命周期控制和内存优化)

综合开源框架之RxJava/RxAndroid