[RxJava 响应式编程] 奉上一篇全面的 RxJava2 方法总结·下
Posted Code Brick
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了[RxJava 响应式编程] 奉上一篇全面的 RxJava2 方法总结·下相关的知识,希望对你有一定的参考价值。
[接上文~]
2.1.4 组合操作
1.startWith & startWithArray
startWith()
方法可以用来在指定的数据源的之前插入几个数据,它的功能类似的方法有 startWithArray()
,另外还有几个重载方法。这里我们给出一个基本的用法示例,下面的程序会在原始的数字流 1-5 的前面加上 0,所以最终的输出结果是 012345
:
Observable.range(1,5).startWith(0).subscribe(System.out::print);
下面是 startWith()
及其几个功能相关的方法的定义:
1.public final Observable<T> startWith(Iterable<? extends T> items)
2.public final Observable<T> startWith(ObservableSource<? extends T> other)
3.public final Observable<T> startWith(T item)
4.public final Observable<T> startWithArray(T... items)
2.merge & mergeArray
merge()
可以让多个数据源的数据合并起来进行发射,当然它可能会让 merge()
之后的数据交错发射。下面是一个示例,这个例子中,我们使用 merge()
方法将两个 Observable 合并到了一起进行监听:
Observable.merge(Observable.range(1,5), Observable.range(6,5)).subscribe(System.out::print);
鉴于 merge()
方法及其功能类似的方法太多,我们这里挑选几个比较有代表性的方法,具体的可以查看 RxJava 的源代码:
1.public static <T> Observable<T> merge(Iterable<? extends ObservableSource<? extends T>> sources)
2.public static <T> Observable<T> mergeArray(ObservableSource<? extends T>... sources)
3.public static <T> Observable<T> mergeDelayError(Iterable<? extends ObservableSource<? extends T>> sources)
4.public static <T> Observable<T> mergeArrayDelayError(ObservableSource<? extends T>... sources)
这里的 mergeError()
方法与 merge()
方法的表现一致,只是在处理由 onError()
触发的错误的时候有所不同。mergeError()
方法会等待所有的数据发射完毕之后才把错误发射出来,即使多个错误被触发,该方法也只会发射出一个错误信息。而如果使用 merger()
方法,那么当有错误被触发的时候,该错误会直接被抛出来,并结束发射操作。下面是该方法的一个使用的示例,这里我们主线程停顿 4 秒,然后所有 merge()
的 Observable 中的一个会在线程开始的第 2 秒的时候触发一个错误,该错误最终会在所有的数据发射完毕之后被发射出来:
Observable.mergeDelayError(Observable.range(1,5),
Observable.range(1,5).repeat(2),
Observable.create((ObservableOnSubscribe<String>) observableEmitter -> {
Thread.sleep(2000);
observableEmitter.onError(new Exception("error"));
})
).subscribe(System.out::print, System.out::print);
Thread.sleep(4000);
3.concat & concatArray & concatEager
该方法也是用来将多个 Observable 拼接起来,但是它会严格按照传入的 Observable 的顺序进行发射,一个 Observable 没有发射完毕之前不会发射另一个 Observable 里面的数据。下面是一个程序示例,这里传入了两个 Observable,会按照顺序输出 12345678910
:
Observable.concat(Observable.range(1, 5), Observable.range(6, 5)).subscribe(System.out::print);
下面是该方法的定义,鉴于该方法及其重载方法太多,这里我们选择几个比较有代表性的说明:
1.public static <T> Observable<T> concat(Iterable<? extends ObservableSource<? extends T>> sources)
2.public static <T> Observable<T> concatDelayError(Iterable<? extends ObservableSource<? extends T>> sources)
3.public static <T> Observable<T> concatArray(ObservableSource<? extends T>... sources)
4.public static <T> Observable<T> concatArrayDelayError(ObservableSource<? extends T>... sources)
5.public static <T> Observable<T> concatEager(ObservableSource<? extends ObservableSource<? extends T>> sources)
6.public static <T> Observable<T> concatArrayEager(ObservableSource<? extends T>... sources)
对于 concat()
方法,我们之前已经介绍过它的用法;这里的 conactArray()
的功能与之类似;对于 concatEager()
方法,当一个观察者订阅了它的结果,那么就相当于订阅了它拼接的所有 ObservableSource
,并且会先缓存这些 ObservableSource 发射的数据,然后再按照顺序将它们发射出来。而对于这里的 concatDelayError()
方法的作用和前面的 mergeDelayError()
类似,只有当所有的数据都发射完毕才会处理异常。
4.zip & zipArray & zipIterable
zip()
操作用来将多个数据项进行合并,可以通过一个函数指定这些数据项的合并规则。比如下面的程序的输出结果是 6 14 24 36 50
,显然这里的合并的规则是相同索引的两个数据的乘积。不过仔细看下这里的输出结果,可以看出,如果一个数据项指定的位置没有对应的值的时候,它是不会参与这个变换过程的:
Observable.zip(Observable.range(1, 6), Observable.range(6, 5), (integer, integer2) -> integer * integer2)
.subscribe(i -> System.out.print(i + " "));
zip()
方法有多个重载的版本,同时也有功能近似的方法,这里我们挑选有代表性的几个进行说明:
1.public static <T, R> Observable<R> zip(Iterable<? extends ObservableSource<? extends T>> sources, Function<? super Object[], ? extends R> zipper)
2.ublic static <T, R> Observable<R> zipArray(Function<? super Object[], ? extends R> zipper, boolean delayError, int bufferSize, ObservableSource... sources)
3.public static <T, R> Observable<R> zipIterable(Iterable<? extends ObservableSource<? extends T>> sources, Function<? super Object[], ? extends R> zipper, boolean delayError, int bufferSize)
实际上上面几个方法的用法和功能基本类似,区别在于传入的 ObservableSource
的参数的形式。
5.combineLastest
与 zip()
操作类似,但是这个操作的输出结果与 zip()
截然不同,以下面的程序为例,它的输出结果是 36 42 48 54 60
:
Observable.combineLatest(Observable.range(1, 6), Observable.range(6, 5), (integer, integer2) -> integer * integer2)
.subscribe(i -> System.out.print(i + " "));
利用下面的这张图可以比较容易来说明这个问题:
上图中的上面的两条横线代表用于拼接的两个数据项,下面的一条横线是拼接之后的结果。combineLatest()
的作用是拼接最新发射的两个数据。下面我们用上图的过程来说明该方法是如何执行的:开始第一条只有 1 的时候无法拼接;当第二条出现A的时候,此时最新的数据是 1 和 A,故组合成一个 1A;第二个数据项发射了 B,此时最新的数据是 1 和 B,故组合成1B;第一条横线发射了 2,此时最新的数据是 2 和 B,因此得到了 2B,依次类推。然后再回到我们上面的问题,第一个数据项连续发射了 5 个数据的时候,第二个数据项一个都没有发射出来,因此没有任何输出;然后第二个数据项开始发射数据,当第二个数据项发射了 6 的时候,此时最新的数据组合是 6 和 6,故得 36;然后,第二个数据项发射了 7,此时最新的数据组合是 6 和 7,故得 42,依次类推。
该方法也有对应的 combineLatestDelayError()
方法,用途也是只有当所有的数据都发射完毕的时候才去处理错误逻辑。
2.1.5 辅助操作
1.delay
delay()
方法用于在发射数据之前停顿指定的时间,比如下面的程序会在真正地发射数据之前停顿1秒:
Observable.range(1, 5).delay(1000, TimeUnit.MILLISECONDS).subscribe(System.out::print);
Thread.sleep(1500);
同样 delay()
方法也有几个重载的方法,可以供我们用来指定触发的线程等信息,这里给出其中的两个,其他的可以参考源码和文档:
1.public final Observable<T> delay(long delay, TimeUnit unit)
2.public final Observable<T> delay(long delay, TimeUnit unit, Scheduler scheduler)
2.do 系列
RxJava 中还有一系列的方法可以供我们使用,它们共同的特点是都是以 do
开头,下面我们列举一下这些方法并简要说明一下它们各自的用途:
1.public final Observable<T> doAfterNext(Consumer<? super T> onAfterNext)
,会在 onNext
方法之后触发;
2.public final Observable<T> doAfterTerminate(Action onFinally)
,会在 Observable 终止之后触发;
3.public final Observable<T> doFinally(Action onFinally)
,当 onComplete
或者 onError
的时候触发;
4.public final Observable<T> doOnDispose(Action onDispose)
,当被 dispose 的时候触发;
5.public final Observable<T> doOnComplete(Action onComplete)
,当 complete 的时候触发;
6.public final Observable<T> doOnEach(final Observer<? super T> observer)
,当每个 onNext
调用的时候触发;
7.public final Observable<T> doOnError(Consumer<? super Throwable> onError)
,当调用 onError
的时候触发;
8.public final Observable<T> doOnLifecycle(final Consumer<? super Disposable> onSubscribe, final Action onDispose)
9.public final Observable<T> doOnNext(Consumer<? super T> onNext)
,会在 onNext
的时候触发;
9.public final Observable<T> doOnSubscribe(Consumer<? super Disposable> onSubscribe)
,会在订阅的时候触发;
10.public final Observable<T> doOnTerminate(final Action onTerminate)
,当终止之前触发。
这些方法可以看作是对操作执行过程的一个监听,当指定的操作被触发的时候会同时触发这些监听方法:
Observable.range(1, 5)
.doOnEach(integerNotification -> System.out.println("Each : " + integerNotification.getValue()))
.doOnComplete(() -> System.out.println("complete"))
.doFinally(() -> System.out.println("finally"))
.doAfterNext(i -> System.out.println("after next : " + i))
.doOnSubscribe(disposable -> System.out.println("subscribe"))
.doOnTerminate(() -> System.out.println("terminal"))
.subscribe(i -> System.out.println("subscribe : " + i));
3.subscribeOn & observeOn
subscribeOn()
用于指定 Observable 自身运行的线程,observeOn()
用于指定发射数据所处的线程,比如 android 中的异步任务需要用observeOn()
指定发射数据所在的线程是非主线程,然后执行完毕之后将结果发送给主线程,就需要用 subscribeOn()
来指定。比如下面的程序,我们用这两个方法来指定所在的线程:
Observable.create((ObservableOnSubscribe<Integer>) observableEmitter -> {
System.out.println(Thread.currentThread());
observableEmitter.onNext(0);
}).observeOn(Schedulers.newThread()).subscribeOn(Schedulers.computation())
.subscribe(integer -> System.out.println(Thread.currentThread()));
最终的输出结果如下所示:
Thread[RxComputationThreadPool-1,5,main]
Thread[RxNewThreadScheduler-1,5,main]
4.timeout
用来设置一个超时时间,如果指定的时间之内没有任何数据被发射出来,那么就会执行我们指定的数据项。如下面的程序所示,我们先为设置了一个间隔 200 毫秒的数字产生器,开始发射数据之前要停顿 1 秒钟,因为我们设置的超时时间是 500 毫秒,因而在第 500 毫秒的时候会执行我们传入的数据项:
Observable.interval(1000, 200, TimeUnit.MILLISECONDS)
.timeout(500, TimeUnit.MILLISECONDS, Observable.rangeLong(1, 5))
.subscribe(System.out::print);
Thread.sleep(2000);
timeout()
方法有多个重载方法,可以为其指定线程等参数,可以参考源码或者文档了解详情。
2.1.6 错误处理操作符
错误处理操作符主要用来提供给 Observable,用来对错误信息做统一的处理,常用的两个是 catch()
和 retry()
。
1.catch
catch 操作会拦截原始的 Observable 的 onError
通知,将它替换为其他数据项或者数据序列,让产生的 Observable 能够正常终止或者根本不终止。在 RxJava 中该操作有 3 种类型:
1.onErrorReturn
:这种操作会在 onError 触发的时候返回一个特殊的项替换错误,并调用观察者的 onCompleted 方法,而不会将错误传递给观察者;
2.onErrorResumeNext
:会在 onErro r触发的时候发射备用的数据项给观察者;
3.onExceptionResumeNext
:如果 onError 触发的时候 onError 收到的 Throwable 不是 Exception,它会将错误传递给观察者的 onError 方法,不会使用备用的 Observable。
下面是 onErrorReturn()
和 onErrorResumeNext()
的程序示例,这里第一段代码会在出现错误的时候输出 666
,而第二段会在出现错误的时候发射数字 12345
:
Observable.create((ObservableOnSubscribe<Integer>) observableEmitter -> {
observableEmitter.onError(null);
observableEmitter.onNext(0);
}).onErrorReturn(throwable -> 666).subscribe(System.out::print);
Observable.create((ObservableOnSubscribe<Integer>) observableEmitter -> {
observableEmitter.onError(null);
observableEmitter.onNext(0);
}).onErrorResumeNext(Observable.range(1,5)).subscribe(System.out::print);
2.retry
retry()
使用了一种错误重试机制,它可以在出现错误的时候进行重试,我们可以通过参数指定重试机制的条件。以下面的程序为例,这里我们设置了当出现错误的时候会进行 2 次重试,因此,第一次的时候出现错误会调用 onNext()
,重试2次又会调用 2 次 onNext()
,第二次重试的时候因为重试又出现了错误,因此此时会触发 onError()
方法。也就是说,下面这段代码会触发 onNext()
3 次,触发 onError()
1 次:
Observable.create(((ObservableOnSubscribe<Integer>) emitter -> {
emitter.onNext(0);
emitter.onError(new Throwable("Error1"));
emitter.onError(new Throwable("Error2"));
})).retry(2).subscribe(i -> System.out.println("onNext : " + i), error -> System.out.print("onError : " + error));
retry()
有几个重载的方法和功能相近的方法,下面是这些方法的定义(选取部分):
1.public final Observable<T> retry()
:会进行无限次地重试;
2.public final Observable<T> retry(BiPredicate<? super Integer, ? super Throwable> predicate)
3.public final Observable<T> retry(long times)
:指定重试次数;
4.public final Observable<T> retry(long times, Predicate<? super Throwable> predicate)
5.public final Observable<T> retryUntil(final BooleanSupplier stop)
6.public final Observable<T> retryWhen(Function<? super Observable<Throwable>, ? extends ObservableSource<?>> handler)
2.1.7 条件操作符和布尔操作符
1.all & any
1.all()
用来判断指定的数据项是否全部满足指定的要求,这里的“要求”可以使用一个函数来指定;
2.any()
用来判断指定的 Observable 是否存在满足指定要求的数据项。
在下面的程序中,我们用该函数来判断指定的数据项是否全部满足大于 5 的要求,显然是不满足的,因此下面的程序将会输出 false
:
Observable.range(5, 5).all(i -> i>5).subscribe(System.out::println); // false
Observable.range(5, 5).any(i -> i>5).subscribe(System.out::println); // true
以下是该方法的定义:
1.public final Single<Boolean> all(Predicate<? super T> predicate)
2.public final Single<Boolean> any(Predicate<? super T> predicate)
2.contains & isEmpty
这两个方法分别用来判断数据项中是否包含我们指定的数据项,已经判断数据项是否为空:
Observable.range(5, 5).contains(4).subscribe(System.out::println); // false
Observable.range(5, 5).isEmpty().subscribe(System.out::println); // false
以下是这两个方法的定义:
1.public final Single<Boolean> isEmpty()
2.public final Single<Boolean> contains(final Object element)
3.sequenceEqual
sequenceEqual()
用来判断两个 Observable 发射出的序列是否是相等的。比如下面的方法用来判断两个序列是否相等:
Observable.sequenceEqual(Observable.range(1,5), Observable.range(1, 5)).subscribe(System.out::println);
4.amb
amb()
作用的两个或多个 Observable,但是只会发射最先发射数据的那个 Observable 的全部数据:
Observable.amb(Arrays.asList(Observable.range(1, 5), Observable.range(6, 5))).subscribe(System.out::print)
该方法及其功能近似的方法的定义,这里前两个是静态的方法,第二个属于实例方法:
1.public static <T> Observable<T> amb(Iterable<? extends ObservableSource<? extends T>> sources)
2.public static <T> Observable<T> ambArray(ObservableSource<? extends T>... sources)
3.public final Observable<T> ambWith(ObservableSource<? extends T> other)
5.defaultIfEmpty
defaultIfEmpty()
用来当指定的序列为空的时候指定一个用于发射的值。下面的程序中,我们直接调用发射器的 onComplete()
方法,因此序列是空的,结果输出一个整数 6
:
Observable.create((ObservableOnSubscribe<Integer>) Emitter::onComplete).defaultIfEmpty(6).subscribe(System.out::print);
下面是该方法的定义:
1.public final Observable<T> defaultIfEmpty(T defaultItem)
2.1.8 转换操作符
1.toList & toSortedList
toList()
和 toSortedList()
用于将序列转换成列表,后者相对于前者增加了排序的功能:
Observable.range(1, 5).toList().subscribe(System.out::println);
Observable.range(1, 5).toSortedList(Comparator.comparingInt(o -> -o)).subscribe(System.out::println);
下面是它们的定义,它们有多个重载版本,这里选择其中的两个进行说明:
1.public final Single<List<T>> toList()
2.public final Single<List<T>> toSortedList(final Comparator<? super T> comparator)
注意一下,这里的返回结果是 Single 类型的,不过这并不妨碍我们继续使用链式操作,因为 Single 的方法和 Observable 基本一致。
另外还要注意这里的 Single 中的参数是一个 List<T>
,也就是说,它把整个序列转换成了一个列表对象。因此,上面的两个示例程序的输出是:
[1, 2, 3, 4, 5]
[5, 4, 3, 2, 1]
2.toMap & toMultimap
toMap()
用于将发射的数据转换成另一个类型的值,它的转换过程是针对每一个数据项的。以下面的代码为例,它会将原始的序列中的每个数字转换成对应的十六进制。但是,toMap()
转换的结果不一定是按照原始的序列的发射的顺序来的:
Observable.range(8, 10).toMap(Integer::toHexString).subscribe(System.out::print);
与 toMap()
近似的是 toMultimap()
方法,它可以将原始序列的每个数据项转换成一个集合类型:
Observable.range(8, 10).toMultimap(Integer::toHexString).subscribe(System.out::print);
上面的两段程序的输出结果是:
{11=17, a=10, b=11, c=12, d=13, e=14, f=15, 8=8, 9=9, 10=16}
{11=[17], a=[10], b=[11], c=[12], d=[13], e=[14], f=[15], 8=[8], 9=[9], 10=[16]}
上面的两个方法的定义是(多个重载,选择部分):
1.public final <K> Single<Map<K, T>> toMap(final Function<? super T, ? extends K> keySelector)
2.public final <K> Single<Map<K, Collection<T>>> toMultimap(Function<? super T, ? extends K> keySelector)
3.toFlowable
该方法用于将一个 Observable 转换成 Flowable 类型,下面是该方法的定义,显然这个方法使用了策略模式,这里面涉及背压相关的内容,我们后续再详细介绍。
public final Flowable<T> toFlowable(BackpressureStrategy strategy)
4.to
相比于上面的方法,to()
方法的限制更加得宽泛,你可以将指定的 Observable 转换成任意你想要的类型(如果你可以做到的话)。下面是一个示例代码,用来将指定的整数序列转换成另一个整数类型的 Observable,只不过这里的每个数据项都是原来的列表中的数据总数的值:
Observable.range(1, 5).to(Observable::count).subscribe(System.out::println);
下面是该方法的定义:
public final <R> R to(Function<? super Observable<T>, R> converter)
2.2 线程控制
之前有提到过 RxJava 的线程控制是通过 subscribeOn()
和 observeOn()
两个方法来完成的。这里我们梳理一下 RxJava 提供的几种线程调度器以及 RxAndroid 为 Android 提供的调度器的使用场景和区别等。
1.Schedulers.io()
:代表适用于 io 操作的调度器,增长或缩减来自适应的线程池,通常用于网络、读写文件等 io 密集型的操作。重点需要注意的是线程池是无限制的,大量的 I/O 调度操作将创建许多个线程并占用内存。
2.Schedulers.computation()
:计算工作默认的调度器,代表 CPU 计算密集型的操作,与 I/O 操作无关。它也是许多 RxJava 方法,比如 buffer()
,debounce()
,delay()
,interval()
,sample()
,skip()
,的默认调度器。
3.Schedulers.newThread()
:代表一个常规的新线程。
4.Schedulers.immediate()
:这个调度器允许你立即在当前线程执行你指定的工作。它是 timeout()
,timeInterval()
以及 timestamp()
方法默认的调度器。
5.Schedulers.trampoline()
:当我们想在当前线程执行一个任务时,并不是立即,我们可以用 trampoline()
将它入队。这个调度器将会处理它的队列并且按序运行队列中每一个任务。它是 repeat()
和 retry()
方法默认的调度器。
以及 RxAndroid 提供的线程调度器:
AndroidSchedulers.mainThread()
用来指代 Android 的主线程
2.3 总结
上面的这些操作也基本适用于 Flowable、Single、Completable 和 Maybe。
我们花费了很多的时间和精力来梳理了这些方法,按照上面的内容,使用 RxJava 实现一些基本的或者高级的操作都不是什么问题。
但是,Observable 更适用于处理一些数据规模较小的问题,当数据规模比较多的时候可能会出现 MissingBackpressureException
异常。因此,我们还需要了解背压和 Flowable 的相关内容才能更好地理解和应用 RxJava.
最后
这是 [响应式编程] 系列的第二部分,完整系列文章如下:
[RxJava 响应式编程] 奉上一篇全面的 RxJava2 方法总结·下
[RxJava 响应式编程] 使用 RxJava 的 Flowable 和背压功能
[RxJava 响应式编程] RxJava 的实际应用示范,用 RxJava 打造 EventBus
[RxJava 响应式编程] 最硬核,RxJava2 源码和原理分析
非常感谢您的关注~
以上是关于[RxJava 响应式编程] 奉上一篇全面的 RxJava2 方法总结·下的主要内容,如果未能解决你的问题,请参考以下文章