一篇博客让你了解RxJava

Posted 伯努力不努力

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了一篇博客让你了解RxJava相关的知识,希望对你有一定的参考价值。

RxJava可以说是2016年最流行的项目之一了,最近也接触了一下RxJava,于是想写一篇博客,希望能通过这篇博客让大家能对其进行了解,本篇博客是基于RxJava2.0,跟RxJava1.0还是有很多不同的

基础知识

RxJava的核心就是“异步”两个字,其最关键的东西就是两个:

  1. Observable(被观察者)

  2. Observer/Subscriber(观察者)

Observable可以发出一系列的 事件,这里的事件可以是任何东西,例如网络请求、复杂计算处理、数据库操作、文件操作等等,事件执行结束后交给 Observer回调处理。

Observable可以理解为事件的发送者,就好像快递的寄出者,而这些事件就好比快递
Observer可以理解为事件的接收者,就好像快递的接收者

那他们之间是如何进行联系的呢?答案就是通过subscribe()方法,下面的代码就是RXJAVA中Observable与Observer进行关联的典型方式:

//创建一个被观察者 Observable
    Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() 
        @Override
        public void subscribe(ObservableEmitter<Integer> e) throws Exception 
            e.onNext(5);
            e.onNext(6);
            e.onNext(7);
            e.onNext(8);
            e.onComplete();
        
    );

    //创建观察者observer
    Observer<Integer> observer = new Observer<Integer>() 
        @Override
        public void onSubscribe(Disposable d) 
            Log.d(TAG, "subscribe");
        

        @Override
        public void onNext(Integer value) 
            Log.d(TAG, value.toString());
        

        @Override
        public void onError(Throwable e) 
            Log.d(TAG, "error");
        

        @Override
        public void onComplete() 
            Log.d(TAG, "complete");
        
    ;
    //建立关联
    observable.subscribe(observer);

运行项目,我们可以看到,数字已经打印出来

这里需要强调的是: 只有当观察者和被观察者建立连接之后, 被观察者才会开始发送事件. 也就是调用了subscribe()方法之后才开始发送事件.

上面我们看到观察者和被观察者的逻辑是分开写的,那能不能合在一起写呢?答案是肯定的,这也是RxJava比较突出的优点,那就是链式操作,代码如下:

Observable.create(new ObservableOnSubscribe<Integer>() 
    @Override
    public void subscribe(ObservableEmitter<Integer> e) throws Exception 
        e.onNext(5);
        e.onNext(6);
        e.onNext(7);
        e.onNext(8);
        e.onComplete();
    
).subscribe(new Observer<Integer>() 
    @Override
    public void onSubscribe(Disposable d) 
        Log.d(TAG, "subscribe");
    

    @Override
    public void onNext(Integer value) 
        Log.d(TAG, value.toString());
    

    @Override
    public void onError(Throwable e) 
        Log.d(TAG, "error");
    

    @Override
    public void onComplete() 
        Log.d(TAG, "complete");
    
);

有时候,你可能觉得,我就打印几个数,还要把Observable写的那么麻烦,能不能简便一点呢?答案是肯定的,RxJava内置了很多简化创建Observable对象的函数,比如Observable.just就是用来创建只发出一个事件就结束的Observable对象,上面创建Observable对象的代码可以简化为一行

Observable<String> observable = Observable.just("hello");

同样对于Observer,这个例子中,我们其实并不关心OnComplete和OnError,我们只需要在onNext的时候做一些处理,这时候就可以使用Consumer类。

Observable<String> observable = Observable.just("hello");
   Consumer<String> consumer = new Consumer<String>() 
       @Override
       public void accept(String s) throws Exception 
           System.out.println(s);
       
   ;
    observable.subscribe(consumer);

其实在RxJava中,我们可以为 Observer中的三种状态根据自身需要分别创建一个回调动作,通过Action 来替代onComplete():,通过Consumer来替代 onError(Throwable t)和onNext(T t)

Observable<String> observable = Observable.just("hello");
    Action onCompleteAction = new Action() 
        @Override
        public void run() throws Exception 
            Log.i(TAG, "complete");
        
    ;
    Consumer<String> onNextConsumer = new Consumer<String>() 
        @Override
        public void accept(String s) throws Exception 
            Log.i(TAG, s);
        
    ;
    Consumer<Throwable> onErrorConsumer = new Consumer<Throwable>() 
        @Override
        public void accept(Throwable throwable) throws Exception 
            Log.i(TAG, "error");
        
    ;
    observable.subscribe(onNextConsumer, onErrorConsumer, onCompleteAction);



Observable.just同样可以发送多个参数

Observable observable = Observable.just("you", "are", "beautiful");
Consumer<String> onNextConsumer = new Consumer<String>() 
    @Override
    public void accept(String s) throws Exception 
        Log.i(TAG, s);
    
;
observable.subscribe(onNextConsumer);

例子:来一个简单的例子来了解事件的产生到消费、订阅的过程:从res/mipmap中取出一张图片,显示在ImageView上。

final ImageView ivLogo = (ImageView) findViewById(R.id.logo);
Observable.create(new ObservableOnSubscribe<Drawable>() 


    @Override
    public void subscribe(ObservableEmitter<Drawable> e) throws Exception 
        // 从mipmap取出一张图片作为Drawable对象
        Drawable drawable = ContextCompat.getDrawable(MainActivity.this, R.mipmap.ic_launcher);

        // 把Drawable对象发送出去
        e.onNext(drawable);
        e.onComplete();
    
).subscribe(new Observer<Drawable>() 
    @Override
    public void onSubscribe(Disposable d) 

    

    @Override
    public void onNext(Drawable value) 
        ivLogo.setImageDrawable(value);
    

    @Override
    public void onError(Throwable e) 

    

    @Override
    public void onComplete() 

    
);

这样就完成了一个简单的图片的设置

ObservableEmitter和Disposable

ObservableEmitter: ObservableEmitter可以理解为发射器,这个就是用来发出事件的,它可以发出三种类型的事件,通过调用emitter的onNext(T value)、onComplete()和onError(Throwable error)就可以分别发出next事件、complete事件和error事件。
注意:但是事件的发送是有一定的规定的,就好比寄快递也要有一定要求,不是什么都能寄的:

1.被观察者可以发送无限个onNext, 观察者也可以接收无限个onNext.
2.当Observable发送了一个onComplete后, Observable的onComplete之后的事件将会继续发送, 而Observer收到onComplete事件之后将不再继续接收事件.
3.当Observable发送了一个onError后, Observable中onError之后的事件将继续发送, 而Observer收到onError事件之后将不再继续接收事件.
4.Observable可以不发送onComplete或onError.
5.最为关键的是onComplete和onError必须唯一并且互斥, 即不能发多个onComplete, 也不能发多个onError, 也不能先发一个onComplete, 然后再发一个onError, 反之亦然

注: 关于onComplete和onError唯一并且互斥这一点, 是需要自行在代码中进行控制, 如果你的代码逻辑中违背了这个规则, 并不一定会导致程序崩溃. 比如发送多个onComplete是可以正常运行的, 依然是收到第一个onComplete就不再接收了, 但若是发送多个onError, 则收到第二个onError事件会导致程序会崩溃.当我们写多个onComplete时,不会报错

当我们又有onComplete又有onError时,发现在调用onComplete后会爆出异常

Observable.create(new ObservableOnSubscribe<Integer>() 
    @Override
    public void subscribe(ObservableEmitter<Integer> e) throws Exception 
        e.onNext(5);
        e.onNext(6);
        e.onNext(7);
        e.onNext(8);
        
        e.onError(new NullPointerException());
        e.onComplete();
    
)

这是onComplete在onError前调用的情况

当我们写两个onError时,会先接受前面的所有事件,最后才报错

介绍了ObservableEmitter, 接下来介绍Disposable, 当调用dispose()方法时, 它就会将观察者和被观察者的联系切断, 从而导致观察者收不到事件.

注意: 调用dispose()并不会导致Observable不再继续发送事件, Observable会继续发送剩余的事件.
看一下下面这个例子:

Observable.create(new ObservableOnSubscribe<Integer>() 
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception 
            Log.d(TAG, "emitter 1");
            emitter.onNext(1);
            Log.d(TAG, "emitter 2");
            emitter.onNext(2);
            Log.d(TAG, "emitter 3");
            emitter.onNext(3);
            Log.d(TAG, "emitter complete");
            emitter.onComplete();
            Log.d(TAG, "emitter 4");
            emitter.onNext(4);
        
    ).subscribe(new Observer<Integer>() 
        private Disposable mDisposable;
        private int i;

        @Override
        public void onSubscribe(Disposable d) 
            Log.d(TAG, "subscribe");
            mDisposable = d;
        

        @Override
        public void onNext(Integer value) 
            Log.d(TAG, "onNext: " + value);
            i++;
            if (i == 2) 
                Log.d(TAG, "dispose");
                mDisposable.dispose();
                Log.d(TAG, "isDisposed : " + mDisposable.isDisposed());
            
        

        @Override
        public void onError(Throwable e) 
            Log.d(TAG, "error");
        

        @Override
        public void onComplete() 
            Log.d(TAG, "complete");
        
    );

打印如下:

在收到onNext 2这个事件后, 我们中断了联系, 但是Observable
仍然发送了3, complete, 4这几个事件, 而且Observable
并没有因为发送了onComplete而停止. 同时可以看到Observer的onSubscribe()方法是最先调用的.

subscribe()有多个重载的方法:

 public final Disposable subscribe() 
 public final Disposable subscribe(Consumer<? super T> onNext) 
 public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError)  
 public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) 
 public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) 
 public final void subscribe(Observer<? super T> observer) 

不带任何参数的subscribe() 表示Observer不关心任何事件,Observable发送什么数据都随你
带有一个Consumer参数的方法表示Observer只关心onNext事件, 其他的事件我假装没看见, 因此我们如果只需要onNext事件可以这么写:

Observable.create(new ObservableOnSubscribe<Integer>() 
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception 
            Log.d(TAG, "emitter 1");
            emitter.onNext(1);
            Log.d(TAG, "emitter 2");
            emitter.onNext(2);
            Log.d(TAG, "emitter 3");
            emitter.onNext(3);
            Log.d(TAG, "emitter complete");
            emitter.onComplete();
            Log.d(TAG, "emitter 4");
            emitter.onNext(4);
        
    ).subscribe(new Consumer<Integer>() 
        @Override
        public void accept(Integer integer) throws Exception 
            Log.d(TAG, "onNext: " + integer);
        
    );

其他方式也是类似的方式

线程调度

正常情况下, Observer和Observable是工作在同一个线程中的, 也就是说Observable在哪个线程发事件, Observer就在哪个线程接收事件.
RxJava中, 当我们在主线程中去创建一个Observable来发送事件, 则这个Observable默认就在主线程发送事件.
当我们在主线程去创建一个Observer来接收事件, 则这个Observer默认就在主线程中接收事件,但其实在现实工作中我们更多的是需要进行线程切换的,最常见的例子就是在子线程中请求网络数据,在主线程中进行展示

要达到这个目的, 我们需要先改变Observable发送事件的线程, 让它去子线程中发送事件, 然后再改变Observer的线程, 让它去主线程接收事件. 通过RxJava内置的线程调度器可以很轻松的做到这一点. 接下来看一段代码:

Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() 
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception 
            Log.d(TAG, "Observable thread is : " + Thread.currentThread().getName());
            Log.d(TAG, "emitter 1");
            emitter.onNext(1);
        
    );

    Consumer<Integer> consumer = new Consumer<Integer>() 
        @Override
        public void accept(Integer integer) throws Exception 
            Log.d(TAG, "Observer thread is :" + Thread.currentThread().getName());
            Log.d(TAG, "onNext: " + integer);
        
    ;

    observable.subscribeOn(Schedulers.newThread())
            .observeOn(androidSchedulers.mainThread())
            .subscribe(consumer);


可以看到, observable发送事件的线程的确改变了, 是在一个叫 RxNewThreadScheduler-1的线程中发送的事件, 而consumer 仍然在主线程中接收事件, 这说明我们的目的达成了, 接下来看看是如何做到的.

这段代码只不过是增加了两行代码:

.subscribeOn(Schedulers.newThread())
        .observeOn(AndroidSchedulers.mainThread())

简单的来说, subscribeOn() 指定的是Observable发送事件的线程, observeOn() 指定的是Observer接收事件的线程.
多次指定Observable的线程只有第一次指定的有效, 也就是说多次调用subscribeOn() 只有第一次的有效, 其余的会被忽略.
多次指定Observer的线程是可以的, 也就是说每调用一次observeOn() , Observer的线程就会切换一次.例如:

observable.subscribeOn(Schedulers.newThread())
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .observeOn(Schedulers.io())
        .subscribe(consumer);

这段代码中指定了两次上游发送事件的线程, 分别是newThread和IO线程, 下游也指定了两次线程,分别是main和IO线程. 运行结果为:

可以看到, Observable虽然指定了两次线程, 但只有第一次指定的有效, 依然是在RxNewThreadScheduler线程中, 而Observer则跑到了RxCachedThreadScheduler 中, 这个CacheThread其实就是IO线程池中的一个.

在 RxJava 中,提供了一个名为 Scheduler 的线程调度器,RxJava 内部提供了4个调度器,分别是:

Schedulers.io(): I/O 操作(读写文件、数据库、网络请求等),与newThread()差不多,区别在于io() 的内部实现是是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 效率比 newThread() 更高。值得注意的是,在 io() 下,不要进行大量的计算,以免产生不必要的线程;

Schedulers.newThread(): 开启新线程操作;

Schedulers.immediate(): 默认指定的线程,也就是当前线程;

Schedulers.computation():计算所使用的调度器。这个计算指的是 CPU 密集型计算,即不会被 I/O等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。值得注意的是,不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU;

AndroidSchedulers.mainThread(): RxJava 扩展的 Android 主线程;

这些内置的Scheduler已经足够满足我们开发的需求, 因此我们应该使用内置的这些选项, 在RxJava内部使用的是线程池来维护这些线程, 所有效率也比较高.

例子:还是用之前设置图片的例子,这次我们在子线程中进行网络请求获取图片,在主线程中对图片进行设置

final ImageView ivLogo = (ImageView) findViewById(R.id.logo);
Observable.create(new ObservableOnSubscribe<Drawable>() 


    @Override
    public void subscribe(ObservableEmitter<Drawable> e) throws Exception 
        try 
            Drawable drawable = Drawable.createFromStream(new URL("https://ss2.baidu.com/6ONYsjip0QIZ8tyhnq/it/u=2502144641,437990411&fm=80&w=179&h=119&img.JPEG").openStream(), "src");
            e.onNext(drawable);
         catch (IOException error) 
            e.onError(error);
        
    
)// 指定 subscribe() 所在的线程,也就是上面subscribe()方法调用的线程
        .subscribeOn(Schedulers.io())
        // 指定 Observer 回调方法所在的线程,也就是onCompleted, onError, onNext回调的线程
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Observer<Drawable>() 
    @Override
    public void onSubscribe(Disposable d) 

    

    @Override
    public void onNext(Drawable value) 
        ivLogo.setImageDrawable(value);
    

    @Override
    public void onError(Throwable e) 

    

    @Override
    public void onComplete() 

    
);

这段代码就做一件事,在 io 线程加载一张网络图片,加载完毕之后在主线程中显示到ImageView上。

操作符的使用

在了解基本知识和线程调度后,我们来学习一下RxJava各种神奇的操作符

Map
Map是RxJava中最简单的一个变换操作符了, 它的作用就是对Observable发送的每一个事件应用一个函数, 使得每一个事件都按照指定的函数去变化.

Observable.create(new ObservableOnSubscribe<Integer>() 
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception 
            emitter.onNext(1);
            emitter.onNext(2);
            emitter.onNext(3);
        
    ).map(new Function<Integer, String>() 
        @Override
        public String apply(Integer integer) throws Exception 
            return "This is result " + integer;
        
    ).subscribe(new Consumer<String>() 
        @Override
        public void accept(String s) throws Exception 
            Log.d(TAG, s);
        
    );

在Observable我们发送的是数字类型, 而在Observer我们接收的是String类型, 中间起转换作用的就是Map操作符, 运行结果为:

通过Map, 可以将Observable发来的事件转换为任意的类型, 可以是一个Object, 也可以是一个集合,功能非常强大

例子:还是以图片加载的例子,我们传进来一个图片的路径,然后通过Map进行转换成drawble再发送给观察者

final ImageView ivLogo = (ImageView) findViewById(R.id.logo);
Observable.create(new ObservableOnSubscribe<String>() 
    @Override
    public void subscribe(ObservableEmitter<String> e) throws Exception 
        e.onNext("https://ss2.baidu.com/-vo3dSag_xI4khGko9WTAnF6hhy/image/h%3D200/sign=4db5130a073b5bb5a1d727fe06d2d523/cf1b9d16fdfaaf51965f931e885494eef11f7ad6.jpg");
    
).map(new Function<String, Drawable>() 
    @Override
    public Drawable apply(String url) throws Exception 
        try 
            Drawable drawable = Drawable.createFromStream(new URL(url).openStream(), "src");
            return drawable;
         catch (IOException e) 

        
        return null;
    
)  .subscribeOn(Schedulers.io())
        // 指定 Observer 回调方法所在的线程,也就是onCompleted, onError, onNext回调的线程
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Observer<Drawable>() 
            @Override
            public void onSubscribe(Disposable d) 

            

            @Override
            public void onNext(Drawable value) 
                if (value != null) 
                    ivLogo.setImageDrawable(value);
                
            

            @Override
            public void onError(Throwable e) 
                Log.e(TAG, e.toString());
            

            @Override
            public void onComplete() 

            
        );

效果如下:

经过改写代码后,有什么变化呢? Observable 创建了一个 String 事件,也就是产生一个url,通过 map 操作符进行变换,返回Drawable对象,这个变换指的就是通过url进行网络图片请求,返回一个Drawable。所以简单的来说就是把String事件,转换为Drawable事件。逻辑表示就是
Observable --> map变换 --> Observable

FlatMap
FlatMap将一个发送事件的Observable变换为多个发送事件的Observables,然后将它们发射的事件合并后放进一个单独的Observable里.

Observable每发送一个事件, flatMap都将对其进行转换, 然后发送转换之后的新的事件, Observer接收到的就是转换后发送的数据. 这里需要注意的是, flatMap并不保证事件的顺序, 如果需要保证顺序则需要使用concatMap.

 Observable.create(new ObservableOnSubscribe<Integer>() 
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception 
            emitter.onNext(1);
            emitter.onNext(2);
            emitter.onNext(3);
        
    ).flatMap(new Function<Integer, ObservableSource<String>>() 
        @Override
        public ObservableSource<String> apply(Integer integer) throws Exception 
            final List<String> list = new ArrayList<>();
            for (int i = 0; i < 3; i++) 
                list.add("I am value " + integer);
            
            return Observable.fromIterable(list).delay(10, TimeUnit.MILLISECONDS);
        
    ).subscribe(new Consumer<String>() 
        @Override
        public void accept(String s) throws Exception 
            Log.d(TAG, s);
        
    );

效果如下:

Map 与 flatMap 这两个操作符的共同点在于,他们都是把一个对象转换为另一个对象,但须注意以下这些特点:

1.flatMap 返回的是一个Observable对象,而 map 返回的是一个普通转换后的对象;
2.flatMap 返回的Observable对象并不是直接发送到Subscriber的回调中,而是重新创建一个Observable对象,并激活这个Observable对象,使之开始发送事件;而 map 变换后返回的对象直接发到Subscriber回调中;
3.flatMap 变换后产生的每一个Observable对象发送的事件,最后都汇入同一个Observable,进而发送给Subscriber回调;
4.map返回类型 与 flatMap 返回的Observable事件类型,可以与原来的事件类型一样;
5.可以对一个Observable多次使用 map 和 flatMap;

鉴于 flatMap 自身强大的功能,这常常被用于 嵌套的异步操作,例如嵌套网络请求。传统的嵌套请求,一般都是在前一个请求的 onSuccess() 回调里面发起新的请求,这样一旦嵌套多个的话,缩进就是大问题了,而且严重的影响代码的可读性。而RxJava嵌套网络请求仍然通过链式结构,保持代码逻辑的清晰!举个栗子:

public interface Api 
    @GET
    Observable<LoginResponse> login(@Body LoginRequest request);

    @GET
    Observable<RegisterResponse> register(@Body RegisterRequest request);


接着创建一个Retrofit客户端:

private static Retrofit create() 
    OkHttpClient.Builder builder = new OkHttpClient().newBuilder();
    builder.readTimeout(10, TimeUnit.SECONDS);
    builder.connectTimeout(9, TimeUnit.SECONDS);

    if (BuildConfig.DEBUG) 
        HttpLoggingInterceptor interceptor = new HttpLoggingInterceptor();
        interceptor.setLevel(HttpLoggingInterceptor.Level.BODY);
        builder.addInterceptor(interceptor);
    

    return new Retrofit.Builder().baseUrl(ENDPOINT)
            .client(builder.build())
            .addConverterFactory(GsonConverterFactory.create())
            .addCallAdapterFactory(RxJava2CallAdapterFactory.create())
            .build();

发起请求就很简单了:

Api api = retrofit.create(Api.class);
api.login(request)
        .subscribeOn(Schedulers.io())               //在IO线程进行网络请求
        .observeOn(AndroidSchedulers.mainThread())  //回到主线程去处理请求结果
        .subscribe(new Observer<LoginResponse>() 
            @Override
            public void onSubscribe(Disposable d) 

            @Override
            public void onNext(LoginResponse value) 

            @Override
            public void onError(Throwable e) 
                Toast.makeText(mContext, "登录失败", Toast.LENGTH_SHORT).show();
            

            @Override
            public void onComplete() 
                Toast.makeText(mContext, "登录成功", Toast.LENGTH_SHORT).show();
            
        );

concatMap
这里也简单说一下concatMap吧, 它和flatMap的作用几乎一模一样, 只是它的结果是严格按照上游发送的顺序来发送的, 来看个代码吧:

Observable.create(new ObservableOnSubscribe<Integer>() 
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception 
            emitter.onNext(1);
            emitter.onNext(2);
            emitter.onNext(3);
        
    ).concatMap(new Function<Integer, ObservableSource<String>>() 
        @Override
        public ObservableSource<String> apply(Integer integer) throws Exception 
            final List<String> list = new ArrayList<>();
            for (int i = 0; i < 3; i++) 
                list.add("I am value " + integer);
            
            return Observable.fromIterable(list).delay(10,TimeUnit.MILLISECONDS);
        
    ).subscribe(new Consumer<String>() 
        @Override
        public void accept(String s) throws Exception 
            Log.d(TAG, s);
        
    );

只是将之前的flatMap改为了concatMap, 其余原封不动, 运行结果如下:

可以看到, 结果仍然是有序的.

ZIP
Zip通过一个函数将多个Observable发送的事件结合到一起,然后发送这些组合到一起的事件. 它按照严格的顺序应用这个函数。它只发射与发射数据项最少的那个Observable一样多的数据。

Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() 
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception 
            Log.d(TAG, "emitter 1");
            emitter.onNext(1);
            Log.d(TAG, "emitter 2");
            emitter.onNext(2);
            Log.d(TAG, "emitter 3");
            emitter.onNext(3);
            Log.d(TAG, "emitter 4");
            emitter.onNext(4);
            Log.d(TAG, "emit complete1");
            emitter.onComplete();
        
    );

    Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() 
        @Override
        public void subscribe(ObservableEmitter<String> emitter) throws Exception 
            Log.d(TAG, "emitter A");
            emitter.onNext("A");
            Log.d(TAG, "emitter B");
            emitter.onNext("B");
            Log.d(TAG, "emitter C");
            emitter.onNext("C");
            Log.d(TAG, "emitter complete2");
            emitter.onComplete();
        
    );

    Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() 
        @Override
        public String apply(Integer integer, String s) throws Exception 
            return integer + s;
        
    ).subscribe(new Observer<String>() 
        @Override
        public void onSubscribe(Disposable d) 
            Log.d(TAG, "onSubscribe");
        

        @Override
        public void onNext(String value) 
            Log.d(TAG, "onNext: " + value);
        

        @Override
        public void onError(Throwable e) 
            Log.d(TAG, "onError");
        

        @Override
        public void onComplete() 
            Log.d(TAG, "onComplete");
        
    );

我们分别创建了observable, 一个发送1,2,3,4,Complete, 另一个发送A,B,C,Complete, 接着用Zip把发出的事件组合, 来看看运行结果吧:

观察发现observable1发送事件后,observable2才发送
这是因为我们两个observable都是运行在同一个线程里, 同一个线程里执行代码肯定有先后顺序呀.

Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() 
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception 
            Log.d(TAG, "emit 1");
            emitter.onNext(1);
            Thread.sleep(1000);

            Log.d(TAG, "emit 2");
            emitter.onNext(2);
            Thread.sleep(1000);

            Log.d(TAG, "emit 3");
            emitter.onNext(3);
            Thread.sleep(1000);

            Log.d(TAG, "emit 4");
            emitter.onNext(4);
            Thread.sleep(1000);

            Log.d(TAG, "emit complete1");
            emitter.onComplete();
        
    ).subscribeOn(Schedulers.io());

    Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() 
        @Override
        public void subscribe(ObservableEmitter<String> emitter) throws Exception 
            Log.d(TAG, "emit A");
            emitter.onNext("A");
            Thread.sleep(1000);

            Log.d(TAG, "emit B");
            emitter.onNext("B");
            Thread.sleep(1000);

            Log.d(TAG, "emit C");
            emitter.onNext("C");
            Thread.sleep(1000);

            Log.d(TAG, "emit complete2");
            emitter.onComplete();
        
    ).subscribeOn(Schedulers.io());

    Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() 
        @Override
        public String apply(Integer integer, String s) throws Exception 
            return integer + s;
        
    ).subscribe(new Observer<String>() 
        @Override
        public void onSubscribe(Disposable d) 
            Log.d(TAG, "onSubscribe");
        

        @Override
        public void onNext(String value) 
            Log.d(TAG, "onNext: " + value);
        

        @Override
        public void onError(Throwable e) 
            Log.d(TAG, "onError");
        

        @Override
        public void onComplete() 
            Log.d(TAG, "onComplete");
        
    );

好了, 这次我们让事件都在IO线程里发送事件, 再来看看运行结果:

第一个observable明明发送了四个数据+一个Complete, 之前明明还有的, 为啥到这里没了呢?
这是因为我们之前说了, zip发送的事件数量跟observable中发送事件最少的那一个的事件数量是有关的, 在这个例子里我们observable2只发送了三个事件然后就发送了Complete, 这个时候尽管observable1还有事件4 和事件Complete 没有发送, 但是它们发不发送还有什么意义呢?

from

在RxJava的from操作符到2.0已经被拆分成了3个,fromArray, fromIterable, fromFuture接收一个集合作为输入,然后每次输出一个元素给subscriber。

Observable.fromArray(new Integer[]1, 2, 3, 4, 5).subscribe(new Consumer<Integer>() 
    @Override
    public void accept(Integer integer) throws Exception 
        Log.i(TAG, "number:" + integer);
    
);

注意:如果from()里面执行了耗时操作,即使使用了subscribeOn(Schedulers.io()),仍然是在主线程执行,可能会造成界面卡顿甚至崩溃,所以耗时操作还是使用Observable.create(…);

filter
条件过滤,去除不符合某些条件的事件。举个栗子:

Observable.fromArray(new Integer[]1, 2, 3, 4, 5)
       .filter(new Predicate<Integer>() 
           @Override
           public boolean test(Integer integer) throws Exception 
               // 偶数返回true,则表示剔除奇数,留下偶数
               return integer % 2 == 0;

           
       ).subscribe(new Consumer<Integer>() 
    @Override
    public void accept(Integer integer) throws Exception 
        Log.i(TAG, "number:" + integer);
    
);

take
最多保留的事件数。

 Observable.just("1", "2", "6", "3", "4", "5").take(2).subscribe(new Observer<String>() 
            @Override
            public void onSubscribe(Disposable d) 

            

            @Override
            public void onNext(String value) 
                Log.d(TAG,value);
            

            @Override
            public void onError(Throwable e) 

            

            @Override
            public void onComplete() 

            
        );


可以发现我们发送了6个String,最后只打印了前两个,这就是take过滤掉的结果

doOnNext
如果你想在处理下一个事件之前做某些事,就可以调用该方法

Observable.fromArray(new Integer[]1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12).filter(new Predicate<Integer>() 
    @Override
    public boolean test(Integer integer) throws Exception 
        // 偶数返回true,则表示剔除奇数
        return integer % 2 == 0;
    
)// 最多保留三个,也就是最后剩三个偶数
        .take(3).doOnNext(new Consumer<Integer>() 
    @Override
    public void accept(Integer integer) throws Exception 
        // 在输出偶数之前输出它的hashCode
        Log.i(TAG, "hahcode = " + integer.hashCode() + "");
    
).subscribe(new Observer<Integer>() 
    @Override
    public void onSubscribe(Disposable d) 

    

    @Override
    public void onNext(Integer value) 
        Log.i(TAG, "number = " + value);
    

    @Override
    public void onError(Throwable e) 

    

    @Override
    public void onComplete() 

    
);

debounce
debounce也是用于事件的过滤,可以指定过滤事件的时间间隔

Observable.create(new ObservableOnSubscribe<Integer>() 
    @Override
    public void subscribe(ObservableEmitter<Integer> e) throws Exception 
        int i = 0;
        int[] times = new int[]100, 1000;
        while (true) 
            i++;
            if (i >= 100)
                break;
            e.onNext(i);
            try 
                // 注意!!!!
                // 当i为奇数时,休眠1000ms,然后才发送i+1,这时i不会被过滤掉
                // 当i为偶数时,只休眠100ms,便发送i+1,这时i会被过滤掉
                Thread.sleep(times[i % 2]);
             catch (InterruptedException error) 
                error.printStackTrace();
            
        
        e.onComplete();
    
)// 间隔400ms以内的事件将被丢弃
        .debounce(400, TimeUnit.MILLISECONDS).subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(new Observer<Integer>() 


            @Override
            public void onError(Throwable e) 
                Log.e(TAG, e.toString());
            

            @Override
            public void onComplete() 
                Log.i(TAG, "complete");
            

            @Override
            public void onSubscribe(Disposable d) 

            

            @Override
            public void onNext(Integer integer) 
                Log.i(TAG, "integer = " + integer);
            
        );

compose
与 flatMap 类似,都是进行变换,返回Observable对象,激活并发送事件。
1.compose 是唯一一个能够从数据流中得到原始Observable的操作符,所以,那些需要对整个数据流产生作用的操作(比如,subscribeOn()和observeOn())需要使用 compose 来实现。相较而言,如果在flatMap()中使用subscribeOn()或者observeOn(),那么它仅仅对在flatMap 中创建的Observable起作用,而不会对剩下的流产生影响。这样就可以简化subscribeOn()以及observeOn()的调用次数了。
2.compose 是对 Observable 整体的变换,换句话说, flatMap 转换Observable里的每一个事件,而 compose 转换的是整个Observable数据流。
3.flatMap 每发送一个事件都创建一个 Observable,所以效率较低。而 compose 操作符只在主干数据流上执行操作。
4.建议使用 compose 代替 flatMap。

First
只发送符合条件的第一个事件。可以与contact操作符,做网络缓存。
例子:依次检查Disk与Network,如果Disk存在缓存,则不做网络请求,否则进行网络请求。

// 从缓存获取
        Observable<BookList> fromDisk = Observable.create(new Observable.OnSubscribe<BookList>() 
            @Override
            public void call(Subscriber<? super BookList> subscriber) 
                BookList list = getFromDisk();
                if (list != null) 
                    subscriber.onNext(list);
                 else 
                    subscriber.onCompleted();
                
            
        );

// 从网络获取
        Observable<BookList> fromNetWork = bookApi.getBookDetailDisscussionList();

        Observable.concat(fromDisk, fromNetWork)
                // 如果缓存不为null,则不再进行网络请求。反之
                .first()
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Subscriber<BookList>() 
                    @Override
                    public void onCompleted() 

                    

                    @Override
                    public void onError(Throwable e) 

                    

                    @Override
                    public void onNext(BookList discussionList) 

                    
                );

Single
Single与Observable类似,相当于是他的精简版。订阅者回调的不是OnNext/OnError/onCompleted,而是回调OnSuccess/OnError。

Single.create(new SingleOnSubscribe<Object>() 
    @Override
    public void subscribe(SingleEmitter<Object> e) throws Exception 
        e.onSuccess("hello world");
    
).subscribe(new SingleObserver<Object>() 
    @Override
    public void onSubscribe(Disposable d) 

    

    @Override
    public void onSuccess(Object value) 
        Log.i(TAG, value.toString());
    

    @Override
    public void onError(Throwable e) 

    
);

可以配合debounce,避免SearchEditText频繁请求。

final Subject subject = PublishSubject.create();

subject.debounce(400, TimeUnit.MILLISECONDS)
        .subscribe(new Observer() 
            

            @Override
            public void onError(Throwable e) 

            

            @Override
            public void onComplete() 

            

            @Override
            public void onSubscribe(Disposable d) 
                
            

            @Override
            public void onNext(Object o) 
                // request
            
        );

edittext.addTextChangedListener(new TextWatcher() 

    @Override
    public void beforeTextChanged(CharSequence s, int start, int count, int after)  

    @Override
    public void onTextChanged(CharSequence s, int start, int before, int count) 
        subject.onNext(s.toString());
    

    @Override
    public void afterTextChanged(Editable s)  
);

RxJava的一些使用场景

场景1:
取数据,首先检查内存是否有缓存
然后检查文件缓存中是否有
最后才从网络中取
前面任何一个条件满足,就不会执行后面的

final Observable<String> memory = Observable.create(new ObservableOnSubscribe<String>() 
    @Override
    public void subscribe(ObservableEmitter<String> emitter) throws Exception 
        if (memoryCache != null) 
            emitter.onNext(memoryCache);
         else 
            emitter.onComplete();
        
    
);

final Observable<String> disk  = Observable.create(new ObservableOnSubscribe<String>() 
    String cachePref = getSharedPreferences("rxdeni",MODE_PRIVATE).getString("cache",null);
    @Override
    public void subscribe(ObservableEmitter<String> emitter) throws Exception 
        if (cachePref != null) 
            emitter.onNext(cachePref);
         else 
            emitter.onComplete();
        
    
);

Observable<String> network = Observable.just("network");

//主要就是靠concat operator来实现
Observable.concat(memory, disk, network).firstElement()

        .subscribeOn(Schedulers.newThread())
        .subscribe(new Consumer<String>() 
            @Override
            public void accept(String s) throws Exception 
                System.out.println("--------------subscribe: " + s);
            
        );

场景2:界面需要等到多个接口并发取完数据,再更新

Observable<String> observable1 = Observable.create(new ObservableOnSubscribe<String>() 
        @Override
        public void subscribe(ObservableEmitter<String> e) throws Exception 
            e.onNext("haha");
        
    ).subscribeOn(Schedulers.newThread());

    Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() 
        @Override
        public void subscribe(ObservableEmitter<String> e) throws Exception 
            e.onNext("hehe");
        
    ).subscribeOn(Schedulers.newThread());


    Observable.merge(observable1, observable2)
            .subscribeOn(Schedulers.newThread())
            .subscribe(new Observer<String>() 
                @Override
                public void onSubscribe(Disposable d) 

                

                @Override
                public void onNext(String value) 
                    Log.d(TAG,value);
                

                @Override
                public void onError(Throwable e) 

                

                @Override
                public void onComplete() 

                
            );

场景3:界面按钮需要防止连续点击的情况

RxView.clicks(button)
        .throttleFirst(1, TimeUnit.SECONDS)
        .subscribe(new Observer<Object>() 
            @Override
            public void onCompleted() 

            

            @Override
            public void onError(Throwable e) 

            

            @Override
            public void onNext(Object o) 
                Log.i(TAG, "do clicked!");
            
        );

场景4:响应式的界面
比如勾选了某个checkbox,自动更新对应的preference

SharedPreferences preferences = PreferenceManager.getDefaultSharedPreferences(this);
RxSharedPreferences rxPreferences = RxSharedPreferences.create(preferences);

Preference<Boolean> checked = rxPreferences.getBoolean("checked", true);

CheckBox checkBox = (CheckBox) findViewById(R.id.cb_test);
RxCompoundButton.checkedChanges(checkBox)
        .subscribe(checked.asAction());

场景5:复杂的数据变换

Observable.just("1", "2", "6", "3", "4", "5")
        .map(new Function<String, Integer>() 
            @Override
            public Integer apply(String s) throws Exception 
                return Integer.parseInt(s);
            
        ).filter(new Predicate<Integer>() 
    @Override
    public boolean test(Integer integer) throws Exception 
        return integer.intValue()%2 == 0;
    
).distinct().take(2).reduce(new BiFunction<Integer, Integer, Integer>() 
    @Override
    public Integer apply(Integer integer, Integer integer2) throws Exception 
        return integer.intValue() + integer2.intValue();
    
).subscribe(new Consumer<Integer>() 
    @Override
    public void accept(Integer integer) throws Exception 
       Log.d(TAG,integer.toString());
    
);

最后,推送一下自己的微信公众号,喜欢的同学可以关注。

以上是关于一篇博客让你了解RxJava的主要内容,如果未能解决你的问题,请参考以下文章

RxJava学习资料

一文带你全面了解RxJava

Android--带你一点点封装项目 MVP+BaseActivity+Retrofit+Dagger+RxJava

借助JDK8和RxJava如何让你的业务代码跑的更快

原创专栏RxJava源代码剖析

3种方式!让你3秒在51CTO博客快速发布一篇博文