Rxjava2 Observable的数据过滤详解及实例

Posted jiangming-blogs

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Rxjava2 Observable的数据过滤详解及实例相关的知识,希望对你有一定的参考价值。

接续上篇: Rxjava2 Observable的数据过滤详解及实例(一)

6. Filter

只发射通过了函数过滤的数据项。

技术图片

实例代码:

    // filter(Predicate<? super Integer> predicate)
    // 验证数据,决定是否发射数据
    Observable.range(1, 10)
            .filter(new Predicate<Integer>() {

                @Override
                public boolean test(Integer t) throws Exception {
                    // 进行测试验证是否需要发射数据
                    return t > 5 ? true : false;
                }
            }).subscribe(new Consumer<Integer>() {

                @Override
                public void accept(Integer t) throws Exception {
                    System.out.println("--> accept filter: " + t);
                }
            });

输出:

--> accept filter: 6
--> accept filter: 7
--> accept filter: 8
--> accept filter: 9
--> accept filter: 10

Javadoc: filter(predicate)

7. Frist

只发射第一项或者满足某个条件的第一项数据。如果你只对Observable发射的第一项数据,或者满足某个条件的第一项数据感兴趣,你可以使用 First 操作符。

Frist 操作符有以下几种操作:

7.1 firstElement()

只发射第一个数据,当数据存在的情况。

技术图片

实例代码:

    // 1. firstElement()
    // 只发射第一个数据
    Observable.range(1, 10)
        .firstElement()
        .subscribe(new Consumer<Integer>() {

            @Override
            public void accept(Integer t) throws Exception {
                System.out.println("--> accept firstElement(1): "  + t);
            }
        });

输出:

--> accept firstElement(1): 1

Javadoc: firstElement()

7.2 first(defaultItem)

first(defaultItem)firstElement() 类似,但是在Observagle没有发射任何数据时发射一个你在参数中指定的 defaultItem 默认值。

技术图片

实例代码:

    // 2. first(Integer defaultItem)
    // 发射第一个数据项,如果没有数据项,发送默认的defaultItem
    Observable.create(new ObservableOnSubscribe<Integer>() {

        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onComplete();
        }
    }).first(999) // 没有数据发送时,发送默认值999
      .subscribe(new Consumer<Integer>() {

          @Override
          public void accept(Integer t) throws Exception {
              System.out.println("--> accept first(2): " + t);
          }
      });

输出:

--> accept first(2): 999

Javadoc: first(defaultItem)

7.3 firstOrError()

发射第一个数据项,如果没有数据项,会发送 NoSuchElementException 通知。

技术图片

实例代码:

    // 3. first(Integer defaultItem)
    // 发射第一个数据项,如果没有数据项,会有Error: NoSuchElementException
    Observable.create(new ObservableOnSubscribe<Integer>() {

        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onComplete();
        }
    }).firstOrError() // 没有数据发送时,将会发送NoSuchElementException通知
      .subscribe(new SingleObserver<Integer>() {

            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("--> onSubscribe: ");
            }

            @Override
            public void onSuccess(Integer t) {
                System.out.println("--> accept onSuccess(3): " + t);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("--> acctpt onError(3): " + e);
            }
      });

输出:

--> onSubscribe: 
--> acctpt onError(3): java.util.NoSuchElementException

Javadoc: firstOrError()

8. Single

singlefirst 类似,但是如果原始Observable在完成之前不是正好发射一次数据,它会抛出一个NoSuchElementException 的通知。

Single 有以下几种操作:

8.1 singleElement()

发射单例数据,超过一个就会发送 NoSuchElementException 通知。

技术图片

实例代码:

    // 1.singleElement()
    // 发射单例数据,超过一个就会NoSuchElementException  
    Observable.create(new ObservableOnSubscribe<Integer>() {

        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onNext(1);
            emitter.onNext(2);
            emitter.onComplete();
        }
    }).singleElement() // 发送单个数据,大于1项数据就会有Error通知
      .subscribe(new Consumer<Integer>() {

            @Override
            public void accept(Integer t) throws Exception {
                System.out.println("--> accept singleElement(1): " + t);
            }
      },new Consumer<Throwable>() {

        @Override
        public void accept(Throwable t) throws Exception {
            System.out.println("--> OnError(1): " + t);
        }
    });

输出:

--> OnError(1): java.lang.IllegalArgumentException: Sequence contains more than one element!

Javadoc: singleElement()

8.2 single(defaultItem)

发射单例数据,没有接收到数据项则发送指定默认 defaultItem 数据。

技术图片

实例代码:

    // 2. single(Integer defaultItem)
    // 发射单例数据,没有数据项发送指定默认defaultItem
    Observable.create(new ObservableOnSubscribe<Integer>() {

        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onComplete();
        }
    }).single(999) // 没有接受到数据则发送默认数据999
      .subscribe(new Consumer<Integer>() {

            @Override
            public void accept(Integer t) throws Exception {
                System.out.println("--> accept single(2): " + t);
            }
      });

输出:

--> accept single(2): 999

Javadoc: single(defaultItem)

8.3 singleOrError()

发射一个单例的数据,如果数据源没有数据项,则发射一个 NoSuchElementException 通知。

技术图片

实例代码:

    // 3.singleOrError()
    // 发射一个单例的数据,如果数据源 没有数据项,则发送一个NoSuchElementException异常通知
    Observable.create(new ObservableOnSubscribe<Integer>() {

        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onComplete();
        }
    }).singleOrError() // 如果没有数据项发送,则发送一个NoSuchElementException异常通知
      .subscribe(new SingleObserver<Integer>() {

            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("--> onSubscribe(3): ");
            }

            @Override
            public void onSuccess(Integer t) {
                System.out.println("--> onSuccess(3): " + t);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("--> onError(3): " + e);
            }
        });

输出:

--> onSubscribe(3): 
--> onError(3): java.util.NoSuchElementException

Javadoc: singleOrError()

9. ElementAt

ElementAt 操作符获取原始Observable发射的数据序列指定索引位置的数据项,然后当做自己的唯一数据发射。

ElementAt 操作符有以下几种操作:

9.1 elementAt(index)

发射索引位置第 index 项数据(从0开始计数),如果数据不存在,会 IndexOutOfBoundsException 异常。

技术图片

实例代码:

    // 1. elementAt(long index)
    // 指定发射第N项数据(从0开始计数),如果数据不存在,会IndexOutOfBoundsException异常
    Observable.range(1, 10)
        .elementAt(5) // 发射数据序列中索引为5的数据项,索引从0开始
        .subscribe(new Consumer<Integer>() {

            @Override
            public void accept(Integer t) throws Exception {
                System.out.println("--> accept ElementAt(1): " + t);
            }
        });

输出:

--> accept ElementAt(1): 6

Javadoc: elementAt(index)

9.2 elementAt(index, defaultItem)

发射索引位置第 index 项数据(从0开始计数),如果数据不存在,发送默认 defaultItem 数据。

技术图片

实例代码:

    // 2. elementAt(long index, Integer defaultItem)
    // 指定发射第N项数据(从0开始计数),如果数据不存在,发送默认defaultItem
    Observable.range(1, 10)
        .elementAt(20, 0) // 发射索引第20项数据,不存在此项数据时,发送默认数据0
        .subscribe(new Consumer<Integer>() {

            @Override
            public void accept(Integer t) throws Exception {
                System.out.println("--> accept elementAt(2): " + t);
            }
        });

输出:

--> accept elementAt(2): 0

Javadoc: elementAt(index, defaultItem)

9.3 elementAtOrError(index)

发射索引位置第 index 项数据(从0开始计数),如果指定发射的数据不存在,会发射NoSuchElementException 异常通知。

技术图片

实例代码:

    // 3. elementAtOrError(long index)
    // 如果指定发射的数据不存在,会抛出NoSuchElementException
    Observable.range(1, 10)
        .elementAtOrError(50) // 发射索引为50的数据,不存在则发送NoSuchElementException异常通知
        .subscribe(new SingleObserver<Integer>() {

            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("--> onSubscribe(3): ");
            }

            @Override
            public void onSuccess(Integer t) {
                System.out.println("--> onSuccess(3): " + t);
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("--> onError(3): " + e);
            }
        });

输出:

--> onSubscribe(3): 
--> onError(3): java.util.NoSuchElementException

Javadoc: elementAtOrError(index)

10. ignoreElements

不发射任何数据,只发射Observable的终止通知。

IgnoreElements 操作符抑制原始Observable发射的所有数据,只允许它的终止通知 (onError 或 onCompleted )通过。

技术图片

解析: 如果你不关心一个Observable发射的数据,但是希望在它完成时或遇到错误终止时收到通知,你可以对Observable使用 ignoreElements 操作符,它会确保永远不会调用观察者的 onNext() 方法。

实例代码:

    // ignoreElements()
    // 只接受onError或onCompleted通知,拦截onNext事件(不关心发射的数据,只希望在成功或者失败的时候收到通知)
    Observable.create(new ObservableOnSubscribe<Integer>() {

        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onNext(1);
            //  int i = 1/0;
            emitter.onComplete();
        }
    }).ignoreElements()
      .subscribe(new CompletableObserver() {

            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("--> onSubscribe");
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("--> onError: " + e);
            }

            @Override
            public void onComplete() {
                System.out.println("--> onComplete");
            }
      });

输出:

--> onSubscribe
--> onComplete

Javadoc: ignoreElements()

11. Last

只发射最后一项(或者满足某个条件的最后一项)数据。

如果你只对Observable发射的最后一项数据,或者满足某个条件的最后一项数据感兴趣,你可以使用 Last 操作符。

Last 有以下几种操作:

11.1 lastElement()

只发射最后一项数据,使用没有参数的 last 操作符,如果Observable中没有数据发送,则同样没有数据发送。

技术图片

实例代码:

    // 1. lastElement()
    // 接受最后一项数据
    Observable.create(new ObservableOnSubscribe<Integer>() {

        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onNext(1);
            emitter.onNext(2);
            emitter.onNext(3);
            emitter.onComplete();
        }
    }).lastElement() // 存在数据发送的话,即发射最后一项数据,否则没有数据发射
      .subscribe(new Consumer<Integer>() {

            @Override
            public void accept(Integer t) throws Exception {
                System.out.println("--> accept lastElement(1): " + t);
            }
        });

输出:

--> accept lastElement(1): 3

Javadoc: lastElement()

11.2 last(defaultItem)

只发射最后一项数据,如果Observable中没有数据发送,则发送指定的默认值 defaultItem

技术图片

实例代码:

    // 2. last(Integer defaultItem)
    // 接受最后一项数据,如果没有数据发送,发送默认数据:defaultItem
    Observable.range(0, 0)
        .last(999) // 接受最后一项数据,没有数据则发送默认数据999
        .subscribe(new Consumer<Integer>() {

            @Override
            public void accept(Integer t) throws Exception {
                System.out.println("--> accept: last(2): " + t);
            }
        });

输出:

--> accept: last(2): 999

Javadoc: last(defaultItem)

11.3 lastOrError()

接受最后一项数据,如果没有数据发送,抛出 NoSuchElementException 异常通知。

技术图片

实例代码:

    // 3. lastOrError()
    // 接受最后一项数据,如果没有数据发送,抛出onError: NoSuchElementException
    Observable.range(0, 0)
        .lastOrError() // 接受最后一项数据,如果没有数据,则反射NoSuchElementException异常通知
        .subscribe(new SingleObserver<Integer>() {

            @Override
            public void onSubscribe(Disposable d) {
                System.out.println("--> onSubscribe: ");
            }

            @Override
            public void onSuccess(Integer t) {
                System.out.println("--> onSuccess(3)");
            }

            @Override
            public void onError(Throwable e) {
                System.out.println("--> onError(3): " + e);
            }
        });

输出:

--> onSubscribe: 
--> onError(3): java.util.NoSuchElementException

Javadoc: lastOrError()

12. Take

使用Take操作符让你可以修改Observable的行为,只返回前面的N项数据,然后发射完成通知,忽略剩余的数据。

Take 操作符有以下几种操作:

12.1 take(count)

如果你对一个Observable使用 take(n) 操作符,而那个Observable发射的数据少于N项,那么 take 操作生成的Observable不会抛异常或发射 onError 通知,在完成前它只会发射相同的少量数据。

技术图片

实例代码:

    // 1. take(long count)
    // 返回前count项数据
    Observable.range(1, 100)
        .take(5) // 返回前5项数据
        .subscribe(new Consumer<Integer>() {

            @Override
            public void accept(Integer t) throws Exception {
                System.out.println("--> accept take(1): " + t);
            }
        });

输出:

--> accept take(1): 1
--> accept take(1): 2
--> accept take(1): 3
--> accept take(1): 4
--> accept take(1): 5

Javadoc: take(count)

12.2 take(timeout, TimeUnit)

取一定时间间隔内的数据,有可选参数 scheduler 指定线程调度器。

技术图片

实例代码:

    // 2. take(long time, TimeUnit unit,[Scheduler] scheduler)
    //  取一定时间间隔内的数据,可选参数scheduler指定线程调度器
    Observable.intervalRange(1, 10, 1, 1, TimeUnit.SECONDS)
        .take(5, TimeUnit.SECONDS) // 返回前5秒的数据项
        .subscribe(new Consumer<Long>() {

            @Override
            public void accept(Long t) throws Exception {
                System.out.println("--> accept take(2): " + t);
            }
        });

输出:

--> accept take(2): 1
--> accept take(2): 2
--> accept take(2): 3
--> accept take(2): 4
--> accept take(2): 5

Javadoc: take(timeout, TimeUnit)
Javadoc: take(timeout, TimeUnit, Scheduler)

13. TakeLast

使用 TakeLast 操作符修改原始Observable,你可以只发射Observable发射的后N项数据,忽略前面的数据。

takeLast 的这个变体默认在 computation 调度器上执行,但是你可以使用第三个参数指定其它的调度器。

TakeLast 一般有下面几种操作:

13.1 takeLast(count)

使用 takeLast(count) 操作符,你可以只发射原始Observable发射的后 count 项数据(或者原始Observable发射onCompleted() 前的 count 项数据),忽略之前的数据。 注意:这会延迟原始Observable发射的任何数据项,直到它全部完成。

技术图片

实例代码:

    // 1. takeLast(int count)
    // 接受Observable数据发射完成前的Count项数据, 忽略前面的数据
    Observable.range(1, 10)
            .doOnNext(new Consumer<Integer>() {
                @Override
                public void accept(Integer t) throws Exception {
                    System.out.println("--> accept(1): " + t);
                }
            })
            .doOnComplete(new Action() {
                @Override
                public void run() throws Exception {
                    System.out.println("--> onCompleted(1): ");
                }
            })
            .takeLast(5) // 发送数据发射完成前的5项数据
            .subscribe(new Consumer<Integer>() {

                @Override
                public void accept(Integer t) throws Exception {
                    System.out.println("--> accept takeLast(1): " + t);
                }
            });

输出:

--> accept(1): 1
--> accept(1): 2
--> accept(1): 3
--> accept(1): 4
--> accept(1): 5
--> accept(1): 6
--> accept(1): 7
--> accept(1): 8
--> accept(1): 9
--> accept(1): 10
--> onCompleted(1): 
--> accept takeLast(1): 6
--> accept takeLast(1): 7
--> accept takeLast(1): 8
--> accept takeLast(1): 9
--> accept takeLast(1): 10

Javadoc: takeLast(count)

13.2 takeLast(time, TimeUnit)

还有一个 takeLast 变体接受一个时长而不是数量参数。它会发射在原始Observable的生命周期内最后一段时间内发射的数据。时长和时间单位通过参数指定。

注意: 这会延迟原始Observable发射的任何数据项,直到它全部完成。

技术图片

实例代码:

    // 2. takeLast(long time, TimeUnit unit, Scheduler scheduler, boolean delayError, int bufferSize)
    // 可选参数 scheduler:指定工作调度器  delayError:延迟Error通知  bufferSize:指定缓存大小
    // 接受Observable数据发射完成前指定时间间隔发射的数据项
    Observable.intervalRange(1, 5, 1, 1, TimeUnit.SECONDS)
            .doOnNext(new Consumer<Long>() {
                @Override
                public void accept(Long t) throws Exception {
                    System.out.println("--> accept(2): " + t);
                }
            })
            .doOnComplete(new Action() {
                @Override
                public void run() throws Exception {
                    System.out.println("--> onCompleted(2): ");
                }
            })
            .takeLast(3, TimeUnit.SECONDS) // 发送数据发射完成前3秒时间段内的数据
            .subscribe(new Consumer<Long>() {

                @Override
                public void accept(Long t) throws Exception {
                    System.out.println("--> accept takeLast(2): " + t);
                }
            });                                                     

输出:

--> accept(2): 1
--> accept(2): 2
--> accept(2): 3
--> accept(2): 4
--> accept(2): 5
--> onCompleted(2): 
--> accept takeLast(2): 3
--> accept takeLast(2): 4
--> accept takeLast(2): 5

Javadoc: takeLast(long time, TimeUnit unit)
Javadoc: takeLast(long time, TimeUnit unit, boolean delayError)
Javadoc: takeLast(long time, TimeUnit unit, Scheduler scheduler)
Javadoc: takeLast(long time, TimeUnit unit, Scheduler scheduler, boolean delayError)
Javadoc: takeLast(long time, TimeUnit unit, Scheduler scheduler, boolean delayError, int bufferSize)

13.3 takeLast(count, time, TimeUnit)

接受 Observable 发射完成前 time 时间段内收集 count 项数据并发射。

技术图片

示例代码:

    // 3. takeLast(long count, long time, TimeUnit unit, Scheduler scheduler, boolean delayError, int bufferSize)
    // 可选参数 scheduler:指定工作调度器  delayError:延迟Error通知  bufferSize:指定缓存大小
    // 接受Observable数据发射完成前time时间段内收集count项数据并发射
    Observable.intervalRange(1, 10, 1, 100, TimeUnit.MILLISECONDS)
            .doOnNext(new Consumer<Long>() {
                @Override
                public void accept(Long t) throws Exception {
                    System.out.println("--> accept(3): " + t);
                }
            })
            .doOnComplete(new Action() {
                @Override
                public void run() throws Exception {
                    System.out.println("--> onCompleted(3): ");
                }
            })
            .takeLast(2, 500, TimeUnit.MILLISECONDS) // 在原数据发射完成前500毫秒内接受2项数据
            .subscribe(new Consumer<Long>() {

                @Override
                public void accept(Long t) throws Exception {
                    System.out.println("--> accept takeLast(3): " + t);
                }
            });

输出:

--> accept(3): 1
--> accept(3): 2
--> accept(3): 3
--> accept(3): 4
--> accept(3): 5
--> accept(3): 6
--> accept(3): 7
--> accept(3): 8
--> accept(3): 9
--> accept(3): 10
--> onCompleted(3): 
--> accept takeLast(3): 9
--> accept takeLast(3): 10

Javadoc: takeLast(long count, long time, TimeUnit unit)
Javadoc: takeLast(long count, long time, TimeUnit unit, Scheduler scheduler)
Javadoc: takeLast(long count, long time, TimeUnit unit, Scheduler scheduler, boolean delayError, int bufferSize)

14. OfType

ofType 是 filter 操作符的一个特殊形式。它过滤一个Observable只返回指定类型的数据。

技术图片

示例代码:

    Object[] dataObjects = {1, "Hello", 2.1f, 8.88, "1", new Integer(5)};
    // ofType(Class clazz)
    // 过滤数据,只返回特定类型的数据
    Observable.fromArray(dataObjects)
            .ofType(Integer.class) // 过滤Integer类型的数据
            .subscribe(new Consumer<Integer>() {

                @Override
                public void accept(Integer t) throws Exception {
                    System.out.println("--> accept ofType: " + t);
                }
            });

输出:

--> accept ofType: 1
--> accept ofType: 5

Javadoc: ofType(Class clazz)

小结:

数据过滤的操作符主要是过滤被观察者(Observable)发射的数据序列,按照指定的规则过滤数据项,忽略并丢弃其他的数据。实际开发场景如网络数据的过滤,数据库数据的过滤等,是开发中重要且常见的操作之一。

Rx介绍与讲解及完整目录参考:Rxjava2 介绍与详解实例

实例代码:

以上是关于Rxjava2 Observable的数据过滤详解及实例的主要内容,如果未能解决你的问题,请参考以下文章

Rxjava2 Observable的布尔操作符详解及实例

Rxjava2 Observable的数据变换详解及实例

Rxjava2 Observable的数据变换详解及实例

Rxjava2 Observable的创建详解及实例

Rxjava2 可连接的Observable(ConnectableObservable)操作详解及实例

Rxjava2 Observable的结合操作详解及实例