Android :RxJava学习笔记之条件/布尔操作符

Posted JMW1407

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Android :RxJava学习笔记之条件/布尔操作符相关的知识,希望对你有一定的参考价值。

条件/布尔操作符

通过设置函数,判断被观察者(Observable)发送的事件是否符合条件

1、all()

作用:判断发送的每项数据是否都满足 设置的函数条件

  • 若满足,返回 true;否则,返回 false

  Observable.just(1,2,3,4,5,6)
                .all(new Predicate<Integer>(){
                    @Override
                    public boolean test( Integer integer) throws Exception {
                        return (integer<=10);
                        // 该函数用于判断Observable发送的10个数据是否都满足integer<=10
                    }
                }).subscribe(new Consumer<Boolean>() {
            @Override
            public void accept(Boolean aBoolean) throws Exception {
                Log.d(TAG,"result is "+ aBoolean);
                // 输出返回结果
            }

        });


因为所有数据都满足函数内条件 (每项数据<=10)

2、takeWhile()

作用:判断发送的每项数据是否满足 设置函数条件

  • 若发送的数据满足该条件,则发送该项数据;否则不发送

发射原始Observable,直到你指定的某个条件不成立的那一刻,它停止发射原始Observable,并终止自己的Observable。

// 1. 每1s发送1个数据 = 从0开始,递增1,即0、1、2、3
        Observable.interval(1, TimeUnit.SECONDS)
                // 2. 通过takeWhile传入一个判断条件
                .takeWhile(new Predicate<Long>(){
                    @Override
                    public boolean test( Long integer) throws Exception {
                        return (integer<3);
                        // 当发送的数据满足<3时,才发送Observable的数据
                    }
                }).subscribe(new Observer<Long>() {
            @Override
            public void onSubscribe(Disposable d) {
            }

            @Override
            public void onNext(Long value) {
                Log.d(TAG,"发送了事件 "+ value);
            }

            @Override
            public void onError(Throwable e) {
            }

            @Override
            public void onComplete() {
            }
        });

3、skipWhile()

作用:判断发送的每项数据是否满足 设置函数条件,

  • 直到该判断条件 = false时,才开始发送Observable的数据
  • takeWhile与skipWhile操作符作用相反
// 1. 每隔1s发送1个数据 = 从0开始,每次递增1
        Observable.interval(1, TimeUnit.SECONDS)
                // 2. 通过skipWhile()设置判断条件
                .skipWhile(new Predicate<Long>(){
                    @Override
                    public boolean test( Long aLong) throws Exception {
                        return (aLong<5);
                        // 直到判断条件不成立 = false = 发射的数据≥5,才开始发送数据
                    }
                }).subscribe(new Observer<Long>() {
            @Override
            public void onSubscribe(Disposable d) {
            }

            @Override
            public void onNext(Long value) {
                Log.d(TAG,"发送了事件 "+ value);
            }

            @Override
            public void onError(Throwable e) {
            }

            @Override
            public void onComplete() {
            }
        });

4、takeUntil()

订阅并开始发射原始Observable,它还监视你提供的第二个Observable。如果第二个Observable发射了一项数据或者发射了一个终止通知( onError通知或一个onCompleted通知),TakeUntil返回的Observable会停止发射原始Observable并终止。

// 1. 每1s发送1个数据 = 从0开始,递增1,即0、1、2、3
        Observable.interval(1, TimeUnit.SECONDS)
                // 2. 通过takeUntil的Predicate传入判断条件
                .takeUntil(new Predicate<Long>(){
                    @Override
                    public boolean test( Long integer) throws Exception {
                        return (integer>3);
                        // 返回true时,就停止发送事件
                        // 当发送的数据满足>3时,就停止发送Observable的数据
                    }
                }).subscribe(new Observer<Long>() {
            @Override
            public void onSubscribe(Disposable d) {
            }

            @Override
            public void onNext(Long value) {
                Log.d(TAG,"发送了事件 "+ value);
            }

            @Override
            public void onError(Throwable e) {
            }

            @Override
            public void onComplete() {
            }
        });


对比
  Observable.interval(1, TimeUnit.SECONDS)
                // 2. 通过takeWhile传入一个判断条件
                .takeWhile(new Predicate<Long>(){
                    @Override
                    public boolean test( Long integer) throws Exception {
                        return (integer<3);
                        // 当发送的数据满足<3时,才发送Observable的数据
                    }
                }).


该判断条件也可以是Observable,即 等到 takeUntil() 传入的Observable开始发送数据,(原始)第1个Observable的数据停止发送数据

// (原始)第1个Observable:每隔1s发送1个数据 = 从0开始,每次递增1
        Observable.interval(1, TimeUnit.SECONDS)
                // 第2个Observable:延迟5s后开始发送1个Long型数据
                .takeUntil(Observable.timer(5, TimeUnit.SECONDS))
                .subscribe(new Observer<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "开始采用subscribe连接");
                    }

                    @Override
                    public void onNext(Long value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "对Error事件作出响应");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "对Complete事件作出响应");
                    }

                });

当第 5s 时,第2个 Observable 开始发送数据,于是(原始)第1个 Observable 停止发送数据

5、skipUntil()

作用:等到 skipUntil() 传入的Observable开始发送数据,(原始)第1个Observable的数据才开始发送数据

  • SkipUntil订阅原始的Observable,但是忽略它的发射物,直到第二个Observable发射了一项数据那一刻,它开始发射原始Observable。

                // (原始)第1个Observable:每隔1s发送1个数据 = 从0开始,每次递增1
        Observable.interval(1, TimeUnit.SECONDS)
                // 第2个Observable:延迟5s后开始发送1个Long型数据
                .skipUntil(Observable.timer(5, TimeUnit.SECONDS))
                .subscribe(new Observer<Long>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "开始采用subscribe连接");
                    }

                    @Override
                    public void onNext(Long value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "对Error事件作出响应");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "对Complete事件作出响应");
                    }

                });

6、SequenceEqual()

判定两个Observables是否发射相同的数据序列。 传递两个Observable给SequenceEqual操作符,它会比较两个Observable的发射物

  • 如果两个序列是相同的(相同的数据,相同的顺序,相同的终止状态),它就发射true,否则发射false。
Observable.sequenceEqual(
                Observable.just(4,5,6),
                Observable.just(4,5,6)
        )
                .subscribe(new Consumer<Boolean>() {
                    @Override
                    public void accept( Boolean aBoolean) throws Exception {
                        Log.d(TAG,"2个Observable是否相同:"+ aBoolean);
                        // 输出返回结果
                    }
                });

7、Contains、IsEmpty 、exists

7.1、Contains

Contains操作符用来判断源Observable所发射的数据是否包含某一个数据

  • 包含会返回true,如果源Observable已经结束了却还没有发射这个数据则返回false。
  • 内部实现 = exists()

Observable.just(1,2,3,4,5,6)
                .contains(4)
                .subscribe(new Consumer<Boolean>() {
            @Override
            public void accept(Boolean aBoolean) throws Exception {
                Log.d(TAG,"result is "+ aBoolean);
                // 输出返回结果
            }

        });

测试结果:因为发送的数据中包含4

7.2、IsEmpty

IsEmpty:相关的一个操作符IsEmpty用于判定原始Observable是否没有发射任何数据。

  • 若为空,返回 true;否则,返回 false

Observable.just(1,2,3,4,5,6)
          .isEmpty() // 判断发送的数据中是否为空
        }).subscribe(new Action1<Boolean>() {
            @Override
            public void call(Boolean aBoolean) {
                Log.d(TAG,"result is "+ aBoolean); 
                // 输出返回结果
            }
        });

因为发送的数据不为空

7.3、exists

RxJava中还有一个exists操作符,它通过一个谓词函数测试原始Observable发射的数据

  • 只要任何一项满足条件就返回一个发射true的Observable,否则返回一个发射false的Observable。

//Contains:判定一个Observable是否发射一个特定的值
Observable.just(4,5,6)
        .contains(4)
        .subscribe(new Action1<Boolean>() {
            @Override
            public void call(Boolean aBoolean) {
                System.out.println("contains(4):"+aBoolean);
            }
        });
//isEmpty:判定原始Observable是否没有发射任何数据
Observable.just(4,5,6)
        .isEmpty()
        .subscribe(new Action1<Boolean>() {
            @Override
            public void call(Boolean aBoolean) {
                System.out.println("isEmpty():"+aBoolean));
            }
        });

//exists操作符,它通过一个谓词函数测试原始Observable发射的数据,
// 只要任何一项满足条件就返回一个发射true的Observable,
// 否则返回一个发射false的Observable。
Observable.just(4,5,6)
        .exists(new Func1<Integer, Boolean>() {
            @Override
            public Boolean call(Integer integer) {
                return integer<5;
            }
        })
        .subscribe(new Action1<Boolean>() {
            @Override
            public void call(Boolean aBoolean) {
                System.out.println("exists():"+aBoolean);
            }
        });

输出

contains(4):true 
isEmpty():false 
exists():true

8、amb()

传递两个或多个(至多9个)Observable给Amb时,它只发射其中首先发射数据或通知(onError或onCompleted)的那个Observable的所有数据,而其他所有的Observable的发射物将被丢弃。

有一个类似的对象方法ambWith。Observable.amb(o1,o2)和o1.ambWith(o2)是等价的。

       // 设置2个需要发送的Observable & 放入到集合中
        List<ObservableSource<Integer>> list= new ArrayList <>();
        // 第1个Observable延迟1秒发射数据
        list.add( Observable.just(1,2,3).delay(1,TimeUnit.SECONDS));
        // 第2个Observable正常发送数据
        list.add( Observable.just(4,5,6));

        // 一共需要发送2个Observable的数据
        // 但由于使用了amba(),所以仅发送先发送数据的Observable
        // 即第二个(因为第1个延时了)
        Observable.amb(list).subscribe(new Consumer<Integer>() {
            @Override
            public void accept(Integer integer) throws Exception {
                Log.e(TAG, "接收到了事件 "+integer);
            }
        });

即只发送了先发送数据的Observable的数据 = 4,5,6

9、defaultIfEmpty()

在不发送任何有效事件( Next事件)、仅发送了 Complete 事件的前提下,发送一个默认值

DefaultIfEmpty简单的精确地发射原始Observable的值,如果原始Observable没有发射任何数据正常终止(以onCompleted的形式),DefaultIfEmpty返回的Observable就发射一个你提供的默认值。

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                // 不发送任何有效事件
                //  e.onNext(1);
                //  e.onNext(2);

                // 仅发送Complete事件
                e.onComplete();
            }
        }).defaultIfEmpty(10) // 若仅发送了Complete事件,默认发送 值 = 10
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {
                        Log.d(TAG, "开始采用subscribe连接");
                    }

                    @Override
                    public void onNext(Integer value) {
                        Log.d(TAG, "接收到了事件"+ value  );
                    }

                    @Override
                    public void onError(Throwable e) {
                        Log.d(TAG, "对Error事件作出响应");
                    }

                    @Override
                    public void onComplete() {
                        Log.d(TAG, "对Complete事件作出响应");
                    }
                });

参考

1、Android:RxJava条件/布尔操作符
2、RxJava操作符——条件和布尔操作符(Conditional and Boolean Operators)
3、Rxjava(七):条件操作符和布尔操作符

以上是关于Android :RxJava学习笔记之条件/布尔操作符的主要内容,如果未能解决你的问题,请参考以下文章

Android :RxJava学习笔记之 错误处理

Android :RxJava学习笔记之合并操作符

Android :RxJava学习笔记之Subject

Android :RxJava学习笔记之Single

Android :RxJava学习笔记之创建操作符

Android :RxJava学习笔记之转换操作符