Android :RxJava学习笔记之合并操作符

Posted JMW1407

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Android :RxJava学习笔记之合并操作符相关的知识,希望对你有一定的参考价值。

合并操作符


组合 多个被观察者(Observable) & 合并需要发送的事件,也就是是将两个或多个被观察者合并为一个被观察者,并向观察者传递事件。

1、concat() / concatArray()

组合多个被观察者一起发送数据,合并后 按发送顺序串行执行

  • 二者区别:组合被观察者的数量,即concat()组合被观察者数量≤4个,而concatArray()则可>4个

Observable
        .concat(Observable.just(1, 2, 3),
        Observable.just(4, 5, 6),
        Observable.just(7, 8, 9),
        Observable.just(10, 11, 12))
        .subscribe(new Observer<Integer>() {
          @Override
          public void onSubscribe(Disposable d) {

          }

          @Override
          public void onNext(Integer value) {
            System.out.println("接收到了事件"+ value );
          }

          @Override
          public void onError(Throwable e) {
            System.out.println("对Error事件作出响应" );
          }

          @Override
          public void onComplete() {
            System.out.println("对Complete事件作出响应");
          }
        });

输出:

接收到了事件1
接收到了事件2
接收到了事件3
接收到了事件4
接收到了事件5
接收到了事件6
接收到了事件7
接收到了事件8
接收到了事件9
接收到了事件10
接收到了事件11
接收到了事件12
对Complete事件作出响应
// concatArray():组合多个被观察者一起发送数据(可>4个)
        // 注:串行执行
        Observable.concatArray(Observable.just(1, 2, 3),
                           Observable.just(4, 5, 6),
                           Observable.just(7, 8, 9),
                           Observable.just(10, 11, 12),
                           Observable.just(13, 14, 15))
                  .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "对Error事件作出响应");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "对Complete事件作出响应");
                    }
                });

2、merge() / mergeArray()

组合多个被观察者一起发送数据,合并后 按时间线并行执行

二者区别:

  • 组合被观察者的数量,即merge()组合被观察者数量≤4个,而mergeArray()则可>4个
  • 区别上述concat()操作符:同样是组合多个被观察者一起发送数据,但concat()操作符合并后是按发送顺序串行执行

concat

Observable
        .concat(
            Observable.intervalRange(0, 3, 1, 1, TimeUnit.SECONDS),
            // 从0开始发送、共发送3个数据、第1次事件延迟发送时间 = 1s、间隔时间 = 1s
            Observable.intervalRange(2, 3, 1, 1, TimeUnit.SECONDS)
            // 从2开始发送、共发送3个数据、第1次事件延迟发送时间 = 1s、间隔时间 = 1s
        )
        .subscribe(new Observer<Long>() {
          @Override
          public void onSubscribe(Disposable d) {

          }

          @Override
          public void onNext(Long value) {
            System.out.println("接收到了事件"+ value);
          }

          @Override
          public void onError(Throwable e) {
            System.out.println("对Error事件作出响应" );
          }

          @Override
          public void onComplete() {
            System.out.println("对Complete事件作出响应" );
          }
        });
    try {
      Thread.sleep(8000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }

输出

接收到了事件0
接收到了事件1
接收到了事件2
接收到了事件2
接收到了事件3
接收到了事件4
对Complete事件作出响应

merge()

 Observable
        .merge(
            Observable.intervalRange(0, 3, 1, 1, TimeUnit.SECONDS),
            // 从0开始发送、共发送3个数据、第1次事件延迟发送时间 = 1s、间隔时间 = 1s
            Observable.intervalRange(2, 3, 1, 1, TimeUnit.SECONDS)
            // 从2开始发送、共发送3个数据、第1次事件延迟发送时间 = 1s、间隔时间 = 1s
        )
        .subscribe(new Observer<Long>() {
          @Override
          public void onSubscribe(Disposable d) {

          }

          @Override
          public void onNext(Long value) {
            System.out.println("接收到了事件"+ value);
          }

          @Override
          public void onError(Throwable e) {
            System.out.println("对Error事件作出响应" );
          }

          @Override
          public void onComplete() {
            System.out.println("对Complete事件作出响应" );
          }
        });
    try {
      Thread.sleep(8000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }

输出

接收到了事件0
接收到了事件2
接收到了事件1
接收到了事件3
接收到了事件2
接收到了事件4
对Complete事件作出响应

3、concatDelayError() / mergeDelayError()

 Observable.concat(
            Observable.create(new ObservableOnSubscribe<Integer>() {
              @Override
              public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {

                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onError(new NullPointerException()); // 发送Error事件,因为无使用concatDelayError,所以第2个Observable将不会发送事件
                emitter.onComplete();
              }
            }),
            Observable.just(4, 5, 6))
        .subscribe(new Observer<Integer>() {
          @Override
          public void onSubscribe(Disposable d) {

          }
          @Override
          public void onNext(Integer value) {
            System.out.println("接收到了事件"+ value );
          }

          @Override
          public void onError(Throwable e) {
            System.out.println("对Error事件作出响应" );
          }

          @Override
          public void onComplete() {
            System.out.println("对Complete事件作出响应");
          }
        });

输出

接收到了事件1
接收到了事件2
接收到了事件3
对Error事件作出响应
Observable.concatArrayDelayError(
            Observable.create(new ObservableOnSubscribe<Integer>() {
              @Override
              public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {

                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onError(new NullPointerException()); // 发送Error事件,因为无使用concatDelayError,所以第2个Observable将不会发送事件
                emitter.onComplete();
              }
            }),
            Observable.just(4, 5, 6))
        .subscribe(new Observer<Integer>() {
          @Override
          public void onSubscribe(Disposable d) {

          }
          @Override
          public void onNext(Integer value) {
            System.out.println("接收到了事件"+ value );
          }

          @Override
          public void onError(Throwable e) {
            System.out.println("对Error事件作出响应" );
          }

          @Override
          public void onComplete() {
            System.out.println("对Complete事件作出响应");
          }
        });

输出

接收到了事件1
接收到了事件2
接收到了事件3
接收到了事件4
接收到了事件5
接收到了事件6
对Error事件作出响应

4、Zip()

合并 多个被观察者(Observable)发送的事件,生成一个新的事件序列(即组合过后的事件序列),并最终发送

特别注意:

  • 事件组合方式 = 严格按照原先事件序列 进行对位合并
  • 最终合并的事件数量 = 多个被观察者(Observable)中数量最少的数量


Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
      @Override
      public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
        System.out.println("被观察者1发送了事件1");
        emitter.onNext(1);
        // 为了方便展示效果,所以在发送事件后加入2s的延迟
        Thread.sleep(1000);

        System.out.println("被观察者1发送了事件2");
        emitter.onNext(2);
        Thread.sleep(1000);

        System.out.println("被观察者1发送了事件3");
        emitter.onNext(3);
        Thread.sleep(1000);

        emitter.onComplete();
      }
    }).subscribeOn(Schedulers.io()); // 设置被观察者1在工作线程1中工作

        Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
      @Override
      public void subscribe(ObservableEmitter<String> emitter) throws Exception {
        System.out.println("被观察者2发送了事件A");
        emitter.onNext("A");
        Thread.sleep(1000);

        System.out.println("被观察者2发送了事件B");
        emitter.onNext("B");
        Thread.sleep(1000);

        System.out.println("被观察者2发送了事件C");
        emitter.onNext("C");
        Thread.sleep(1000);

        System.out.println("被观察者2发送了事件D");
        emitter.onNext("D");
        
        emitter.onComplete();
      }
    }).subscribeOn(Schedulers.newThread());// 设置被观察者2在工作线程2中工作
    // 假设不作线程控制,则该两个被观察者会在同一个线程中工作,即发送事件存在先后顺序,而不是同时发送


// 注:创建BiFunction对象传入的第3个参数 = 合并后数据的数据类型
        Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
          @Override
          public String apply(Integer integer, String string) throws Exception {
            return  integer + string;
          }
        }).subscribe(new Observer<String>() {
          @Override
          public void onSubscribe(Disposable d) {
            System.out.println("onSubscribe");
          }

          @Override
          public void onNext(String s) {
            System.out.println("接收到了事件"+ s );
          }

          @Override
          public void onError(Throwable e) {
            System.out.println("对Error事件作出响应" );
          }

          @Override
          public void onComplete() {
            System.out.println("对Complete事件作出响应");
          }
        });

    try {
      Thread.sleep(8000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }
  }

输出

onSubscribe
被观察者1发送了事件1
被观察者2发送了事件A
接收到了事件1A
被观察者2发送了事件B
被观察者1发送了事件2
接收到了事件2B
被观察者2发送了事件C
被观察者1发送了事件3
接收到了事件3C
被观察者2发送了事件D
对Complete事件作出响应

5、combineLatest()

当两个Observables中的任何一个发送了数据后,将先发送了数据的Observables 的最新(最后)一个数据 与 另外一个Observable发送的每个数据结合,最终基于该函数的结果发送数据

  • 与Zip()的区别:Zip() = 按个数合并,即1对1合并;
  • CombineLatest() = 按时间合并,即在同一个时间点上合并


可能上面这张图不是太好理解,可以看下面这张图:

combineLatest()发送事件的序列是与发送的时间线有关的。拿上图解释当发送A之后会从上一个Observ拿最近发送的1进行组合生成‘1A’,当发送2时拿第二个Observable最近发送的数据B组合成‘2B’,接下来到事件C时还是取第一个Observable最近发送的时间2进行组合成‘2C’,以此类推。

Observable.combineLatest(
        Observable.just(1L, 2L, 3L), // 第1个发送数据事件的Observable
        Observable.intervalRange(0, 3, 1, 1, TimeUnit.SECONDS), // 第2个发送数据事件的Observable:从0开始发送、共发送3个数据、第1次事件延迟发送时间 = 1s、间隔时间 = 1s
        new BiFunction<Long, Long, Long>() {
          @Override
          public Long apply(Long o1, Long o2) throws Exception {
            // o1 = 第1个Observable发送的最新(最后)1个数据
            // o2 = 第2个Observable发送的每1个数据
            System.out.println("合并的数据是: "+ o1 + " "+ o2);
            return o1 + o2;
            // 合并的逻辑 = 相加
            // 即第1个Observable发送的最后1个数据 与 第2个Observable发送的每1个数据进行相加
          }
        }).subscribe(new Consumer<Long>() {
      @Override
      public void accept(Long s) throws Exception {
        System.out.println("合并的结果是: "+s);
      }
    });
    try {
      Thread.sleep(40000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }

输出

合并的数据是: 3 0
合并的结果是: 3
合并的数据是: 3 1
合并的结果是: 4
合并的数据是: 3 2
合并的结果是: 5
Observable.combineLatest(
        Observable.intervalRange(0, 3, 1, 1, TimeUnit.SECONDS), // 第1个发送数据事件的Observable
        Observable.intervalRange(0, 3, 1, 1, TimeUnit.SECONDS), // 第2个发送数据事件的Observable:从0开始发送、共发送3个数据、第1次事件延迟发送时间 = 1s、间隔时间 = 1s
        new BiFunction<Long, Long, Long>() {
          @Override
          public Long apply(Long o1, Long o2) throws Exception {
            // o1 = 第1个Observable发送的最新(最后)1个数据
            // o2 = 第2个Observable发送的每1个数据
            System.out.println("合并的数据是: "+ o1 + " "+ o2);
            return o1 + o2;
            // 合并的逻辑 = 相加
            // 即第1个Observable发送的最后1个数据 与 第2个Observable发送的每1个数据进行相加
          }
        }).subscribe(new Consumer<Long>() {
      @Override
      public void accept(Long s) throws Exception {
        System.out.println("合并的结果是: "+s);
      }
    });
    try {
      Thread.sleep(4000);
    } catch (InterruptedException e) {
      e.printStackTrace();
    }

输出

合并的数据是: 0 0
合并的结果是: 0
合并的数据是: 1 0
合并的结果是: 1
合并的数据是: 1 1
合并的结果是: 2
合并的数据是: 2 1
合并的结果是: 3
合并的数据是: 2 2
合并的结果是: 4

0               1                   2
        0               1                   2
        0      1       2          3      4

6、startWith() / startWithArray()

在一个被观察者发送事件前,追加发送一些数据 / 一个新的被观察者

Observable.just(4, 5, 6)
        .startWith(0)  // 追加单个数据 = startWith()
        .startWithArray(1, 2, 3) // 追加多个数据 = startWithArray()
        .subscribe(new Observer<Integer>() {
          @Override
          public void onSubscribe(Disposable d) {
          }

          @Override
          public void onNext(Integer value) {
            System.out.println("接收到了事件"+ value );
          }

          @Override
          public void onError(Throwable e) {
            System.out.println("对Error事件作出响应" );
          }

          @Override
          public void onComplete() {
            System.out.println("对Complete事件作出响应");
          }
        });

输出

接收到了事件1
接收到了事件2
接收到了事件3
接收到了事件0
接收到了事件4
接收到了事件5
接收到了事件6
对Complete事件作出响应
Observable.just(4, 5, 6)
        .startWith(Observable.just(1, 2, 3))
        .subscribe(new Observer<Integer>() {
          @Override
          public void onSubscribe(Disposable d) {
          }

          @Override
          public void onNext(Integer value) {
            System.out.println("接收到了事件"+ value );
          }

          @Override
          public void onError(Throwable e) {
            System.out.println("对Error事件作出响应" );
          }

          @Override
          public void onComplete() {
            System.out.println("对Complete事件作出响应");
          }
        });

7、count()

统计被观察者发送事件的数量

// 注:返回结果 = Long类型
    Observable.just(1, 2, 3, 4)
        .count()
        .subscribe(new Consumer<Long>() {
          @Override
          public void accept(Long aLong) throws Exception {
            System.out.println("发送的事件数量 =  "+aLong);
          }
        });

输出

发送的事件数量 =  4

参考

1、Carson带你学Android:RxJava组合/合并操作符
2、Rxjava2 : 合并操作符
3、Android RxJava2(三)组合操作符

以上是关于Android :RxJava学习笔记之合并操作符的主要内容,如果未能解决你的问题,请参考以下文章

Android :RxJava学习笔记之条件/布尔操作符

Android :RxJava学习笔记之创建操作符

Android :RxJava学习笔记之转换操作符

Android :RxJava学习笔记之Single

Android :RxJava学习笔记之 错误处理

Android :RxJava学习笔记之Subject