[RxJava 响应式编程] 奉上一篇的全面的 RxJava2 方法总结·上
Posted Code Brick
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了[RxJava 响应式编程] 奉上一篇的全面的 RxJava2 方法总结·上相关的知识,希望对你有一定的参考价值。
看了许多讲解 RxJava 的文章,有些文章讲解的内容是基于第一个版本的,有些文章的讲解是通过比较常用的一些 API 和基础的概念进行讲解的。每次看到 RxJav a的类中的几十个方法的时候,总是感觉心里没底。所以,我打算自己去专门写篇文章来从 API 的角度系统地梳理一下 RxJava 的各种方法和用法。
1、RxJava 基本
1.1 RxJava 简介
RxJava 是一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库。
虽然,在 android 中,我们可以使用 AsyncTask 来完成异步任务操作,但是当任务的梳理比较多的时候,我们要为每个任务定义一个 AsyncTask 就变得非常繁琐。RxJava 帮助我们在实现异步执行的前提下保持代码的清晰。它的原理就是创建一个 Observable 来完成异步任务,组合使用各种不同的链式操作,来实现各种复杂的操作,最终将任务的执行结果发射给 Observer
进行处理。当然,RxJava 不仅适用于Android,也适用于服务端等各种场景。
我们总结以下 RxJava 的用途:
简化异步程序的流程;
提供了近似于 Java 8 的流的操作:因为想要在 Android 中使用 Java 8 的流编程有诸多的限制,所以我们可以使用 RxJava 来弥补这个缺陷。
在使用RxJava之前,我们需要先在自己的项目中添加如下的依赖:
compile 'io.reactivex.rxjava2:rxjava:2.2.0'
compile 'io.reactivex.rxjava2:rxandroid:2.0.2'
这里我们使用的是 RxJava2,它与 RxJava 的第一个版本有些许不同。在本文中,我们所有的关于 RxJava 的示例都将基于 RxJava2.
注:如果想了解关于 Java 8 的流编程的内容的内容,可以参考我之前写过的文章《五分钟学习 Java 8 的流编程》。
1.2 概要
下面是 RxJava 的一个基本的用例,这里我们定义了一个 Observable,然后在它内部使用 emitter
发射了一些数据和信息(其实就相当于调用了被观察对象内部的方法,通知所有的观察者)。然后,我们用 Consumer
接口的实例作为 subscribe()
方法的参数来观察发射的结果。(这里的接口的方法都已经被使用 Lambda 简化过,应该学着适应它。)
Observable<Integer> observable = Observable.create(emitter -> {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
});
observable.subscribe(System.out::println);
这样,我们就完成了一个基本的 RxJava 的示例。从上面的例子中,你或许没法看出 Observable 中隐藏的流的概念,看下面的例子:
Observable.range(0, 10).map(String::valueOf).forEach(System.out::println);
这里我们先用 Observable.range()
方法产生一个序列,然后用 map()
方法将该整数序列映射成一个字符序列,最后将得到的序列输出来。从上面看出,这种操作和 Java 8 里面的 Stream 编程很像。但是两者之间是有区别的:
所谓的“推”和“拉”的区别:Stream 中是通过从流中读取数据来实现链式操作,而 RxJava 除了 Stream 中的功能之外,还可以通过“发射”数据,来实现通知的功能,即 RxJava 在 Stream 之上又多了一个观察者的功能。
Java8 中的 Stream 可以通过
parall()
来实现并行,即基于分治算法将任务分解并计算得到结果之后将结果合并起来;而 RxJava 只能通过subscribeOn()
方法将所有的操作切换到某个线程中去。Stream 只能被消费一次,但是 Observable 可以被多次进行订阅;
RxJava 除了为我们提供了 Observable 之外,在新的 RxJava 中还提供了适用于其他场景的基础类,它们之间的功能和主要区别如下:
Flowable: 多个流,响应式流和背压
Observable: 多个流,无背压
Single: 只有一个元素或者错误的流
Completable: 没有任何元素,只有一个完成和错误信号的流
Maybe: 没有任何元素或者只有一个元素或者只有一个错误的流
除了上面的几个基础类之外,还有一个 Disposable。当我们监听某个流的时候,就能获取到一个 Disposable 对象。它提供了两个方法,一个是 isDisposed
,可以被用来判断是否停止了观察指定的流;另一个是 dispose()
方法,用来放弃观察指定的流,我们可以使用它在任意的时刻停止观察操作。
1.3 总结
上面我们介绍了了关于 RxJava 的基本的概念和使用方式,在下面的文章中我们会按照以上定义的顺序从 API 的角度来讲解以下 RxJava 各个模块的使用方法。
2、RxJava 的使用
2.1 Observable
从上面的文章中我们可以得知,Observable 和后面3种操作功能近似,区别在于 Flowable 加入了背压的概念,Observable 的大部分方法也适用于其他 3 个操作和 Flowable。因此,我们这里先从 Observable 开始梳理,然后我们再专门对 Flowable 和背压的进行介绍。
Observable 为我们提供了一些静态的构造方法来创建一个 Observable 对象,还有许多链式的方法来完成各种复杂的功能。这里我们按照功能将它的这些方法分成各个类别并依次进行相关的说明。
2.1.1 创建操作
1.interval & intervalRange
下面的操作可以每个 3 秒的时间发送一个整数,整数从 0 开始:
Observable.interval(3, TimeUnit.SECONDS).subscribe(System.out::println);
如果想要设置从指定的数字开始也是可以的,实际上 interval()
提供了许多重载方法供我们是使用。下面我们连同与之功能相近的 intervalRange()
方法也一同给出:
1.public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler)
2.public static Observable<Long> interval(long period, TimeUnit unit, Scheduler scheduler)
3.public static Observable<Long> intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit, Scheduler scheduler)
这里的 initialDelay
参数用来指示开始发射第一个整数的之前要停顿的时间,时间的单位与 peroid
一样,都是通过 unit
参数来指定的;period
参数用来表示每个发射之间停顿多少时间;unit
表示时间的单位,是 TimeUnit
类型的;scheduler
参数指定数据发射和等待时所在的线程。
intervalRange()
系列方法可以用来将发射的整数序列限制在一个范围之内,这里的 start
用来表示发射的数据的起始值,count
表示总共要发射几个数字,其他参数与上面的 interval
方法一致。
2.range & rangeLong
下面的操作可以产生一个从 5 开始的连续 10 个整数构成的序列:
Observable.range(5, 10).subscribe(i -> System.out.println("1: " + i));
该方法需要传入两个参数,与之有相同功能的方法还有 rangeLong()
:
1.public static Observable<Integer> range(final int start, final int count)
2.public static Observable<Long> rangeLong(long start, long count)
这里的两个参数 start
用来指定用于生成的序列的开始值,count
用来指示要生成的序列总共包含多少个数字,上面的两个方法的主要区别在于一个是用来生成 int 型整数的,一个是用来生成 long 型整数的。
3.create
create()
方法用于从头开始创建一个 Observable,像下面显示的那样,你需要使用 create()
方法并传一个发射器作为参数,在该发射器内部调用 onNext()
、onComplete()
和 onError()
方法就可以将数据发送给监听者。
Observable.create((ObservableOnSubscribe<Integer>) observableEmitter -> {
observableEmitter.onNext(1);
observableEmitter.onNext(2);
observableEmitter.onComplete();
}).subscribe(System.out::println);
4.defer
defer()
直到有观察者订阅时才创建 Observable,并且为每个观察者创建一个新的 Observable。defer()
操作符会一直等待直到有观察者订阅它,然后它使用 Observable 工厂方法生成一个 Observable。比如下面的代码两个订阅输出的结果是不一致的:
Observable<Long> observable = Observable.defer((Callable<ObservableSource<Long>>) () -> Observable.just(System.currentTimeMillis()));
observable.subscribe(System.out::print);
System.out.println();
observable.subscribe(System.out::print);
下面是该方法的定义,它接受一个 Callable 对象,可以在该对象中返回一个 Observable 的实例:
public static <T> Observable<T> defer(Callable<? extends ObservableSource<? extends T>> supplier)
5.empty & never & error
1.public static <T> Observable<T> empty()
:创建一个不发射任何数据但是正常终止的 Observable;
2.public static <T> Observable<T> never()
:创建一个不发射数据也不终止的 Observable;
3.public static <T> Observable<T> error(Throwable exception)
:创建一个不发射数据以一个错误终止的 Observable,它有几个重载版本,这里给出其中的一个。
测试代码:
Observable.empty().subscribe(i->System.out.print("next"),i->System.out.print("error"),()->System.out.print("complete"));
Observable.never().subscribe(i->System.out.print("next"),i->System.out.print("error"),()->System.out.print("complete"));
Observable.error(new Exception()).subscribe(i->System.out.print("next"),i->System.out.print("error"),()->System.out.print("complete"));
输出结果:completeerror
6.from 系列
from()
系列的方法用来从指定的数据源中获取一个 Observable:
1.public static <T> Observable<T> fromArray(T... items)
:从数组中获取;
2.public static <T> Observable<T> fromCallable(Callable<? extends T> supplier)
:从 Callable 中获取;
3.public static <T> Observable<T> fromFuture(Future<? extends T> future)
:从 Future 中获取,有多个重载版本,可以用来指定线程和超时等信息;
4.public static <T> Observable<T> fromIterable(Iterable<? extends T> source)
:从 Iterable 中获取;
5.public static <T> Observable<T> fromPublisher(Publisher<? extends T> publisher)
:从 Publisher 中获取。
7.just 系列
just 系列的方法的一个参数的版本为下面的形式:public static <T> Observable<T> just(T item)
,它还有许多个重载的版本,区别在于接受的参数的个数不同,最少 1 个,最多 10 个。
8.repeat 系列
该方法用来表示指定的序列要发射多少次,下面的方法会将该序列无限次进行发送:
Observable.range(5, 10).repeat().subscribe(i -> System.out.println(i));
repeat()
方法有以下几个相似方法:
1.public final Observable<T> repeat()
2.public final Observable<T> repeat(long times)
3.public final Observable<T> repeatUntil(BooleanSupplier stop)
4.public final Observable<T> repeatWhen(Function<? super Observable<Object>, ? extends ObservableSource<?>> handler)
第 1 个无参的方法会无限次地发送指定的序列(实际上内部调用了第 2 个方法并传入了 Long.MAX_VALUE),第 2 个方法会将指定的序列重复发射指定的次数;第 3 个方法会在满足指定的要求的时候停止重复发送,否则会一直发送。
9.timer
timer 操作符创建一个在给定的时间段之后返回一个特殊值的 Observable,它在延迟一段给定的时间后发射一个简单的数字 0。比如下面的程序会在 500 毫秒之后输出一个数字 0
。
Observable.timer(500, TimeUnit.MILLISECONDS).subscribe(System.out::print);
下面是该方法及其重载方法的定义,重载方法还可以指定一个调度器:
1.public static Observable<Long> timer(long delay, TimeUnit unit)
2.public static Observable<Long> timer(long delay, TimeUnit unit, Scheduler scheduler)
2.1.2 变换操作
1.map & cast
1.map
操作符对原始 Observable 发射的每一项数据应用一个你选择的函数,然后返回一个发射这些结果的 Observable。默认不在任何特定的调度器上执行。
2.cast
操作符将原始 Observable 发射的每一项数据都强制转换为一个指定的类型(多态),然后再发射数据,它是 map 的一个特殊版本。
下面的第一段代码用于将生成的整数序列转换成一个字符串序列之后并输出;第二段代码用于将 Date 类型转换成 Object 类型并进行输出,这里如果前面的 Class 无法转换成第二个 Class 就会出现异常:
Observable.range(1, 5).map(String::valueOf).subscribe(System.out::println);
Observable.just(new Date()).cast(Object.class).subscribe(System.out::print);
这两个方法的定义如下:
1.public final <R> Observable<R> map(Function<? super T, ? extends R> mapper)
2.public final <U> Observable<U> cast(Class<U> clazz)
这里的 mapper
函数接受两个泛型,一个表示原始的数据类型,一个表示要转换之后的数据类型,转换的逻辑写在该接口实现的方法中即可。
2.flatMap & contactMap
flatMap
将一个发送事件的上游 Observable 变换为多个发送事件的 Observables,然后将它们发射的事件合并后放进一个单独的 Observable 里。需要注意的是, flatMap() 并不保证事件的顺序,也就是说转换之后的 Observables 的顺序不必与转换之前的序列的顺序一致。比如下面的代码用于将一个序列构成的整数转换成多个单个的 Observable,然后组成一个 Observable,并被订阅。下面输出的结果仍将是一个字符串数字序列,只是顺序不一定是增序的。【考点】
Observable.range(1, 5)
.flatMap((Function<Integer, ObservableSource<String>>) i -> Observable.just(String.valueOf(i)))
.subscribe(System.out::println);
与 flatMap()
对应的方法是 contactMap()
,后者能够保证最终输出的顺序与上游发送的顺序一致。下面是这两个方法的定义:
1.public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper)
2.public final <R> Observable<R> concatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper)
flatMap()
的重载方法数量过多,它们在数据源方面略有不同,有的支持错误等可选参数,具体可以参考源代码。
3.flatMapIterable
flatMapIterable()
可以用来将上流的任意一个元素转换成一个 Iterable
对象,然后我们可以对其进行消费。在下面的代码中,我们先生成一个整数的序列,然后将每个整数映射成一个 Iterable<string>
类型,最后,我们对其进行订阅和消费:
Observable.range(1, 5)
.flatMapIterable((Function<Integer, Iterable<String>>) integer -> Collections.singletonList(String.valueOf(integer)))
.subscribe(s -> System.out.println("flatMapIterable : " + s));
下面是该方法及其重载方法的定义:
1.public final <U> Observable<U> flatMapIterable(Function<? super T, ? extends Iterable<? extends U>> mapper)
2.public final <U, V> Observable<V> flatMapIterable(Function<? super T, ? extends Iterable<? extends U>> mapper, BiFunction<? super T, ? super U, ? extends V> resultSelector)
4.buffer
该方法用于将整个流进行分组。以下面的程序为例,我们会先生成一个 7 个整数构成的流,然后使用 buffer()
之后,这些整数会被 3 个作为一组进行输出,所以当我们订阅了 buffer()
转换之后的 Observable 之后得到的是一个列表构成的 Observable:
Observable.range(1, 7).buffer(3)
.subscribe(integers -> System.out.println(Arrays.toString(integers.toArray())));
下面是这个方法及其重载方法的定义,它的重载方法太多,这里我们只给出其中的两个,其他的可以参考 RxJava 的源码。这里的 buffer 应该理解为一个缓冲区,当缓冲区满了或者剩余的数据不够一个缓冲区的时候就将数据发射出去。
1.public final Observable<List<T>> buffer(int count)
2.public final Observable<List<T>> buffer(int count, int skip)
3….
5.groupBy
groupBy()
用于分组元素,它可以被用来根据指定的条件将元素分成若干组。它将得到一个 Observable<GroupedObservable<T, M>>
类型的 Observable。如下面的程序所示,这里我们使用 concat()
方法先将两个 Observable 拼接成一个 Observable,然后对其元素进行分组。这里我们的分组依据是整数的值,这样我们将得到一个 Observable<GroupedObservable<Integer, Integer>>
类型的 Observable。然后,我们再将得到的序列拼接成一个并进行订阅输出:
Observable<GroupedObservable<Integer, Integer>> observable = Observable.concat(
Observable.range(1,4), Observable.range(1,6)).groupBy(integer -> integer);
Observable.concat(observable).subscribe(integer -> System.out.println("groupBy : " + integer));
该方法有多个重载版本,这里我们用到的一个的定义是:
public final <K> Observable<GroupedObservable<K, T>> groupBy(Function<? super T, ? extends K> keySelector)
6.scan
scan()
操作符对原始 Observable 发射的第一项数据应用一个函数,然后将那个函数的结果作为自己的第一项数据发射。它将函数的结果同第二项数据一起填充给这个函数来产生它自己的第二项数据。它持续进行这个过程来产生剩余的数据序列。这个操作符在某些情况下被叫做 accumulator。
以下面的程序为例,该程序的输结果是 2 6 24 120 720
,可以看出这里的计算规则是,我们把传入到 scan
中的函数记为 f
,序列记为 x
,生成的序列记为 y
,那么这里的计算公式是 y(0)=x(0); y(i)=f(y(i-1), x(i)), i>0
:
Observable.range(2, 5).scan((i1, i2) -> i1 * i2).subscribe(i -> System.out.print(i + " "));
除了上面的这种形式,scan()
方法还有一个重载的版本,我们可以使用这个版本的方法来在生成序列的时候指定一个初始值。以下面的程序为例,它的输出结果是 3 6 18 72 360 2160
,可以看出它的输出比上面的形式多了 1 个,这是因为当指定了初始值之后,生成的第一个数字就是那个初始值,剩下的按照我们上面的规则进行的。所以,用同样的函数语言来描述的话,那么它就应该是下面的这种形式:y(0)=initialValue; y(i)=f(y(i-1), x(i)), i>0
。
Observable.range(2, 5).scan(3, (i1, i2) -> i1 * i2).subscribe(i -> System.out.print(i + " "));
以上方法的定义是:
1.public final Observable<T> scan(BiFunction<T, T, T> accumulator)
2.public final <R> Observable<R> scan(R initialValue, BiFunction<R, ? super T, R> accumulator)
7.window
window
和 buffer 类似,但不是发射来自原始 Observable 的数据包,它发射的是 Observable,这些 Observables 中的每一个都发射原始 Observable 数据的一个子集,最后发射一个 onCompleted 通知。
以下面的程序为例,这里我们首先生成了一个由 10 个数字组成的整数序列,然后使用 window()
函数将它们每 3 个作为一组,每组会返回一个对应的 Observable 对象。这里我们对该返回的结果进行订阅并进行消费,因为 10 个数字,所以会被分成 4 个组,每个对应一个Observable:
Observable.range(1, 10).window(3).subscribe(
observable -> observable.subscribe(integer -> System.out.println(observable.hashCode() + " : " + integer)));
除了对数据包进行分组,我们还可以根据时间来对发射的数据进行分组。该方法有多个重载的版本,这里我们给出其中的比较具有代表性的几个:
1.public final Observable<Observable<T>> window(long count)
2.public final Observable<Observable<T>> window(long timespan, long timeskip, TimeUnit unit)
3.public final <B> Observable<Observable<T>> window(ObservableSource<B> boundary)
4.public final <B> Observable<Observable<T>> window(Callable<? extends ObservableSource<B>> boundary)
2.1.3 过滤操作
1.filter
filter()
用来根据指定的规则对源进行过滤,比如下面的程序用来过滤整数 1 到 10 中所有大于 5 的数字:
Observable.range(1,10).filter(i -> i > 5).subscribe(System.out::println);
下面是该方法的定义:
1.public final Observable<T> filter(Predicate<? super T> predicate)
2.elementAt & firstElement & lastElement
elementAt()
用来获取源中指定位置的数据,它有几个重载方法,这里我们介绍一下最简单的一个方法的用法。下面是 elementAt()
的一个示例,它将获取源数据中索引为 1 的元素并交给观察者订阅。下面的程序将输出 1
:
Observable.range(1, 10).elementAt(0).subscribe(System.out::print);
这里我们给出 elementAt()
及其相关的方法的定义,它们的使用相似。注意一下这里的返回类型:
1.public final Maybe<T> elementAt(long index)
2.public final Single<T> elementAt(long index, T defaultItem)
3.public final Single<T> elementAtOrError(long index)
除了获取指定索引的元素的方法之外,RxJava 中还有可以用来直接获取第一个和最后一个元素的方法,这里我们直接给出方法的定义:
1.public final Maybe<T> firstElement()
2.public final Single<T> first(T defaultItem)
3.public final Single<T> firstOrError()
4.public final Maybe<T> lastElement()
5.public final Single<T> last(T defaultItem)
6.public final Single<T> lastOrError()
3.distinct & distinctUntilChanged
distinct()
用来对源中的数据进行过滤,以下面的程序为例,这里会把重复的数字 7 过滤掉:
Observable.just(1,2,3,4,5,6,7,7).distinct().subscribe(System.out::print);
与之类似的还有 distinctUntilChanged()
方法,与 distinct()
不同的是,它只当相邻的两个元素相同的时候才会将它们过滤掉。比如下面的程序会过滤掉其中的2和5,所以最终的输出结果是 12345676
:
Observable.just(1,2,2,3,4,5,5,6,7,6).distinctUntilChanged().subscribe(System.out::print);
该方法也有几个功能相似的方法,这里给出它们的定义如下:
1.public final Observable<T> distinct()
2.public final <K> Observable<T> distinct(Function<? super T, K> keySelector)
3.public final <K> Observable<T> distinct(Function<? super T, K> keySelector, Callable<? extends Collection<? super K>> collectionSupplier)
4.public final Observable<T> distinctUntilChanged()
5.public final <K> Observable<T> distinctUntilChanged(Function<? super T, K> keySelector)
6.public final Observable<T> distinctUntilChanged(BiPredicate<? super T, ? super T> comparer)
4.skip & skipLast & skipUntil & skipWhile
skip()
方法用于过滤掉数据的前 n 项,比如下面的程序将会过滤掉前 2 项,因此输出结果是 345
:
Observable.range(1, 5).skip(2).subscribe(System.out::print);
与 skip()
方法对应的是 take()
方法,它用来表示只选择数据源的前 n 项,该方法的示例就不给出了。这里,我们说一下与之类功能类似的重载方法。skip()
还有一个重载方法接受两个参数,用来表示跳过指定的时间,也就是在指定的时间之后才开始进行订阅和消费。下面的程序会在 3 秒之后才开始不断地输出数字:
Observable.range(1,5).repeat().skip(3, TimeUnit.SECONDS).subscribe(System.out::print);
与 skip()
功能相反的方法的还有 skipLast()
,它用来表示过滤掉后面的几项,以及最后的一段时间不进行发射等。比如下面的方法,我们会在程序开始之前进行计时,然后会不断重复输出数字,直到 5 秒之后结束。然后,我们用 skipLast()
方法表示最后的 2 秒不再进行发射。所以下面的程序会先不断输出数字 3 秒,3 秒结束后停止输出,并在 2 秒之后结束程序:
long current = System.currentTimeMillis();
Observable.range(1,5)
.repeatUntil(() -> System.currentTimeMillis() - current > TimeUnit.SECONDS.toMillis(5))
.skipLast(2, TimeUnit.SECONDS).subscribe(System.out::print);
与上面的这些方法类似的还有一些,这里我们不再一一列举。因为这些方法的重载方法比较多,下面我们给出其中的具有代表性的一部分:
1.public final Observable<T> skip(long count)
2.public final Observable<T> skip(long time, TimeUnit unit, Scheduler scheduler)
3.public final Observable<T> skipLast(int count)
4.public final Observable<T> skipLast(long time, TimeUnit unit, Scheduler scheduler, boolean delayError, int bufferSize)
5.public final <U> Observable<T> skipUntil(ObservableSource<U> other)
6.public final Observable<T> skipWhile(Predicate<? super T> predicate)
5.take & takeLast & takeUntil & takeWhile
与 skip()
方法对应的是 take()
方法,它表示按照某种规则进行选择操作。我们以下面的程序为例,这里第一段程序表示只发射序列中的前 2 个数据:
Observable.range(1, 5).take(2).subscribe(System.out::print);
下面的程序表示只选择最后 2 秒中输出的数据:
long current = System.currentTimeMillis();
Observable.range(1,5)
.repeatUntil(() -> System.currentTimeMillis() - current > TimeUnit.SECONDS.toMillis(5))
.takeLast(2, TimeUnit.SECONDS).subscribe(System.out::print);
下面是以上相关的方法的定义,同样的,我们只选择其中比较有代表性的几个:
1.public final Observable<T> take(long count)
2.public final Observable<T> takeLast(long count, long time, TimeUnit unit, Scheduler scheduler, boolean delayError, int bufferSize)
3.public final <U> Observable<T> takeUntil(ObservableSource<U> other)
4.public final Observable<T> takeUntil(Predicate<? super T> stopPredicate)
5.public final Observable<T> takeWhile(Predicate<? super T> predicate)
6.ignoreElements
该方法用来过滤所有源 Observable 产生的结果,只会把 Observable 的 onComplete 和 onError 事件通知给订阅者。下面是该方法的定义:
1.public final Completable ignoreElements()
7.throttleFirst & throttleLast & throttleLatest & throttleWithTimeout
这些方法用来对输出的数据进行限制,它们是通过时间的“窗口”来进行限制的,你可以理解成按照指定的参数对时间进行分片,然后根据各个方法的要求选择第一个、最后一个、最近的等进行发射。下面是 throttleLast()
方法的用法示例,它会输出每个 500 毫秒之间的数字中最后一个数字:
Observable.interval(80, TimeUnit.MILLISECONDS)
.throttleLast(500, TimeUnit.MILLISECONDS)
.subscribe(i -> System.out.print(i + " "));
其他的几个方法的功能大致列举如下:
1.throttleFirst()
:只会发射指定的 Observable 在指定的事件范围内发射出来的第一个数据;
2.throttleLast()
:只会发射指定的 Observable 在指定的事件范围内发射出来的最后一个数据;
3.throttleLatest()
:用来发射距离指定的时间分片最近的那个数据;
4.throttleWithTimeout()
:仅在过了一段指定的时间还没发射数据时才发射一个数据,如果在一个时间片达到之前,发射的数据之后又紧跟着发射了一个数据,那么这个时间片之内之前发射的数据会被丢掉,该方法底层是使用 debounce()
方法实现的。如果数据发射的频率总是快过这里的 timeout()
参数指定的时间,那么将不会再发射出数据来。
下面是这些方法及其重载方法的定义(选择其中一部分):
1.public final Observable<T> throttleFirst(long skipDuration, TimeUnit unit, Scheduler scheduler)
2.public final Observable<T> throttleLast(long intervalDuration, TimeUnit unit, Scheduler scheduler)
3.public final Observable<T> throttleLatest(long timeout, TimeUnit unit, Scheduler scheduler, boolean emitLast)
4.public final Observable<T> throttleWithTimeout(long timeout, TimeUnit unit, Scheduler scheduler)
8.debounce
debounce()
也是用来限制发射频率过快的,它仅在过了一段指定的时间还没发射数据时才发射一个数据。我们通过下面的图来说明这个问题:
这里红、绿、蓝三个球发射出来的原因都是因为当反射了这个球之后的一定的时间内没有其他的球发射出来,这个时间是我们可以通过参数来指定的。
该方法的用法与 throttle()
之类的方法类似,上面也说过 throttle()
那些方法底层用了 debounce()
实现,所以,这里我们不再为该方法专门编写相关的测试代码。
9.sample
实际上 throttleLast()
的实现中内部调用的就是 sample()
方法。
小节
这是 RxJava 的使用第一部分,由于文章篇幅比较长,我们将剩下的文章划分到了 《[RxJava 响应式编程] 奉上一篇的全面的 RxJava2 方法总结·下》 里面,欢迎继续阅读~
关于
这是 [响应式编程] 系列的第二部分,完整系列文章如下:
[RxJava 响应式编程] 奉上一篇的全面的 RxJava2 方法总结·上
[RxJava 响应式编程] 奉上一篇的全面的 RxJava2 方法总结·下
[RxJava 响应式编程] 使用 RxJava 的 Flowable 和背压功能
[RxJava 响应式编程] RxJava 的实际应用示范,用 RxJava 打造 EventBus
[RxJava 响应式编程] 最硬核,RxJava2 源码和原理分析
感谢您的关注~
以上是关于[RxJava 响应式编程] 奉上一篇的全面的 RxJava2 方法总结·上的主要内容,如果未能解决你的问题,请参考以下文章
电子书《Java编程方法论:响应式RxJava与代码设计实战》