[RxJava 响应式编程] 奉上一篇全面的 RxJava2 方法总结·下

Posted Code Brick

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了[RxJava 响应式编程] 奉上一篇全面的 RxJava2 方法总结·下相关的知识,希望对你有一定的参考价值。

[接上文~]

2.1.4 组合操作

1.startWith & startWithArray

startWith() 方法可以用来在指定的数据源的之前插入几个数据,它的功能类似的方法有 startWithArray(),另外还有几个重载方法。这里我们给出一个基本的用法示例,下面的程序会在原始的数字流 1-5 的前面加上 0,所以最终的输出结果是 012345

Observable.range(1,5).startWith(0).subscribe(System.out::print);

下面是 startWith() 及其几个功能相关的方法的定义:

1.public final Observable<T> startWith(Iterable<? extends T> items)
2.public final Observable<T> startWith(ObservableSource<? extends T> other)
3.public final Observable<T> startWith(T item)
4.public final Observable<T> startWithArray(T... items)

2.merge & mergeArray

merge() 可以让多个数据源的数据合并起来进行发射,当然它可能会让 merge() 之后的数据交错发射。下面是一个示例,这个例子中,我们使用 merge() 方法将两个 Observable 合并到了一起进行监听:

Observable.merge(Observable.range(1,5), Observable.range(6,5)).subscribe(System.out::print);

鉴于 merge() 方法及其功能类似的方法太多,我们这里挑选几个比较有代表性的方法,具体的可以查看 RxJava 的源代码:

1.public static <T> Observable<T> merge(Iterable<? extends ObservableSource<? extends T>> sources)
2.public static <T> Observable<T> mergeArray(ObservableSource<? extends T>... sources)
3.public static <T> Observable<T> mergeDelayError(Iterable<? extends ObservableSource<? extends T>> sources)
4.public static <T> Observable<T> mergeArrayDelayError(ObservableSource<? extends T>... sources)

这里的 mergeError() 方法与 merge() 方法的表现一致,只是在处理由 onError() 触发的错误的时候有所不同。mergeError() 方法会等待所有的数据发射完毕之后才把错误发射出来,即使多个错误被触发,该方法也只会发射出一个错误信息。而如果使用 merger() 方法,那么当有错误被触发的时候,该错误会直接被抛出来,并结束发射操作。下面是该方法的一个使用的示例,这里我们主线程停顿 4 秒,然后所有 merge() 的 Observable 中的一个会在线程开始的第 2 秒的时候触发一个错误,该错误最终会在所有的数据发射完毕之后被发射出来:

Observable.mergeDelayError(Observable.range(1,5),
        Observable.range(1,5).repeat(2),
        Observable.create((ObservableOnSubscribe<String>) observableEmitter -> {
            Thread.sleep(2000);
            observableEmitter.onError(new Exception("error"));
        })
).subscribe(System.out::print, System.out::print);
Thread.sleep(4000);

3.concat & concatArray & concatEager

该方法也是用来将多个 Observable 拼接起来,但是它会严格按照传入的 Observable 的顺序进行发射,一个 Observable 没有发射完毕之前不会发射另一个 Observable 里面的数据。下面是一个程序示例,这里传入了两个 Observable,会按照顺序输出 12345678910

Observable.concat(Observable.range(15), Observable.range(65)).subscribe(System.out::print);

下面是该方法的定义,鉴于该方法及其重载方法太多,这里我们选择几个比较有代表性的说明:

1.public static <T> Observable<T> concat(Iterable<? extends ObservableSource<? extends T>> sources)
2.public static <T> Observable<T> concatDelayError(Iterable<? extends ObservableSource<? extends T>> sources)
3.public static <T> Observable<T> concatArray(ObservableSource<? extends T>... sources)
4.public static <T> Observable<T> concatArrayDelayError(ObservableSource<? extends T>... sources)
5.public static <T> Observable<T> concatEager(ObservableSource<? extends ObservableSource<? extends T>> sources)
6.public static <T> Observable<T> concatArrayEager(ObservableSource<? extends T>... sources)

对于 concat() 方法,我们之前已经介绍过它的用法;这里的 conactArray() 的功能与之类似;对于 concatEager() 方法,当一个观察者订阅了它的结果,那么就相当于订阅了它拼接的所有 ObservableSource,并且会先缓存这些 ObservableSource 发射的数据,然后再按照顺序将它们发射出来。而对于这里的 concatDelayError() 方法的作用和前面的 mergeDelayError() 类似,只有当所有的数据都发射完毕才会处理异常。

4.zip & zipArray & zipIterable

zip() 操作用来将多个数据项进行合并,可以通过一个函数指定这些数据项的合并规则。比如下面的程序的输出结果是 6 14 24 36 50,显然这里的合并的规则是相同索引的两个数据的乘积。不过仔细看下这里的输出结果,可以看出,如果一个数据项指定的位置没有对应的值的时候,它是不会参与这个变换过程的:

Observable.zip(Observable.range(16), Observable.range(65), (integer, integer2) -> integer * integer2)
        .subscribe(i -> System.out.print(i + " "));

zip() 方法有多个重载的版本,同时也有功能近似的方法,这里我们挑选有代表性的几个进行说明:

1.public static <T, R> Observable<R> zip(Iterable<? extends ObservableSource<? extends T>> sources, Function<? super Object[], ? extends R> zipper)
2.ublic static <T, R> Observable<R> zipArray(Function<? super Object[], ? extends R> zipper, boolean delayError, int bufferSize, ObservableSource... sources)
3.public static <T, R> Observable<R> zipIterable(Iterable<? extends ObservableSource<? extends T>> sources, Function<? super Object[], ? extends R> zipper, boolean delayError, int bufferSize)

实际上上面几个方法的用法和功能基本类似,区别在于传入的 ObservableSource 的参数的形式。

5.combineLastest

zip() 操作类似,但是这个操作的输出结果与 zip() 截然不同,以下面的程序为例,它的输出结果是 36 42 48 54 60

Observable.combineLatest(Observable.range(16), Observable.range(65), (integer, integer2) -> integer * integer2)
        .subscribe(i -> System.out.print(i + " "));

利用下面的这张图可以比较容易来说明这个问题:

上图中的上面的两条横线代表用于拼接的两个数据项,下面的一条横线是拼接之后的结果。combineLatest() 的作用是拼接最新发射的两个数据。下面我们用上图的过程来说明该方法是如何执行的:开始第一条只有 1 的时候无法拼接;当第二条出现A的时候,此时最新的数据是 1 和 A,故组合成一个 1A;第二个数据项发射了 B,此时最新的数据是 1 和 B,故组合成1B;第一条横线发射了 2,此时最新的数据是 2 和 B,因此得到了 2B,依次类推。然后再回到我们上面的问题,第一个数据项连续发射了 5 个数据的时候,第二个数据项一个都没有发射出来,因此没有任何输出;然后第二个数据项开始发射数据,当第二个数据项发射了 6 的时候,此时最新的数据组合是 6 和 6,故得 36;然后,第二个数据项发射了 7,此时最新的数据组合是 6 和 7,故得 42,依次类推。

该方法也有对应的 combineLatestDelayError() 方法,用途也是只有当所有的数据都发射完毕的时候才去处理错误逻辑。

2.1.5 辅助操作

1.delay

delay() 方法用于在发射数据之前停顿指定的时间,比如下面的程序会在真正地发射数据之前停顿1秒:

Observable.range(15).delay(1000, TimeUnit.MILLISECONDS).subscribe(System.out::print);
Thread.sleep(1500);

同样 delay() 方法也有几个重载的方法,可以供我们用来指定触发的线程等信息,这里给出其中的两个,其他的可以参考源码和文档:

1.public final Observable<T> delay(long delay, TimeUnit unit)
2.public final Observable<T> delay(long delay, TimeUnit unit, Scheduler scheduler)

2.do 系列

RxJava 中还有一系列的方法可以供我们使用,它们共同的特点是都是以 do 开头,下面我们列举一下这些方法并简要说明一下它们各自的用途:

1.public final Observable<T> doAfterNext(Consumer<? super T> onAfterNext),会在 onNext 方法之后触发;
2.public final Observable<T> doAfterTerminate(Action onFinally),会在 Observable 终止之后触发;
3.public final Observable<T> doFinally(Action onFinally),当 onComplete 或者 onError 的时候触发;
4.public final Observable<T> doOnDispose(Action onDispose),当被 dispose 的时候触发;
5.public final Observable<T> doOnComplete(Action onComplete),当 complete 的时候触发;
6.public final Observable<T> doOnEach(final Observer<? super T> observer),当每个 onNext 调用的时候触发;
7.public final Observable<T> doOnError(Consumer<? super Throwable> onError),当调用 onError 的时候触发;
8.public final Observable<T> doOnLifecycle(final Consumer<? super Disposable> onSubscribe, final Action onDispose)
9.public final Observable<T> doOnNext(Consumer<? super T> onNext),会在 onNext 的时候触发;
9.public final Observable<T> doOnSubscribe(Consumer<? super Disposable> onSubscribe),会在订阅的时候触发;
10.public final Observable<T> doOnTerminate(final Action onTerminate),当终止之前触发。

这些方法可以看作是对操作执行过程的一个监听,当指定的操作被触发的时候会同时触发这些监听方法:

Observable.range(15)
        .doOnEach(integerNotification -> System.out.println("Each : " + integerNotification.getValue()))
        .doOnComplete(() -> System.out.println("complete"))
        .doFinally(() -> System.out.println("finally"))
        .doAfterNext(i -> System.out.println("after next : " + i))
        .doOnSubscribe(disposable -> System.out.println("subscribe"))
        .doOnTerminate(() -> System.out.println("terminal"))
        .subscribe(i -> System.out.println("subscribe : " + i));

3.subscribeOn & observeOn

subscribeOn() 用于指定 Observable 自身运行的线程,observeOn() 用于指定发射数据所处的线程,比如 android 中的异步任务需要用observeOn() 指定发射数据所在的线程是非主线程,然后执行完毕之后将结果发送给主线程,就需要用 subscribeOn() 来指定。比如下面的程序,我们用这两个方法来指定所在的线程:

Observable.create((ObservableOnSubscribe<Integer>) observableEmitter -> {
    System.out.println(Thread.currentThread());
    observableEmitter.onNext(0);
}).observeOn(Schedulers.newThread()).subscribeOn(Schedulers.computation())
        .subscribe(integer -> System.out.println(Thread.currentThread()));

最终的输出结果如下所示:

Thread[RxComputationThreadPool-1,5,main]
Thread[RxNewThreadScheduler-1,5,main]

4.timeout

用来设置一个超时时间,如果指定的时间之内没有任何数据被发射出来,那么就会执行我们指定的数据项。如下面的程序所示,我们先为设置了一个间隔 200 毫秒的数字产生器,开始发射数据之前要停顿 1 秒钟,因为我们设置的超时时间是 500 毫秒,因而在第 500 毫秒的时候会执行我们传入的数据项:

Observable.interval(1000200, TimeUnit.MILLISECONDS)
        .timeout(500, TimeUnit.MILLISECONDS, Observable.rangeLong(15))
        .subscribe(System.out::print);
Thread.sleep(2000);

timeout() 方法有多个重载方法,可以为其指定线程等参数,可以参考源码或者文档了解详情。

2.1.6 错误处理操作符

错误处理操作符主要用来提供给 Observable,用来对错误信息做统一的处理,常用的两个是 catch()retry()

1.catch

catch 操作会拦截原始的 Observable 的 onError 通知,将它替换为其他数据项或者数据序列,让产生的 Observable 能够正常终止或者根本不终止。在 RxJava 中该操作有 3 类型:

1.onErrorReturn:这种操作会在 onError 触发的时候返回一个特殊的项替换错误,并调用观察者的 onCompleted 方法,而不会将错误传递给观察者;
2.onErrorResumeNext:会在 onErro r触发的时候发射备用的数据项给观察者;
3.onExceptionResumeNext:如果 onError 触发的时候 onError 收到的 Throwable 不是 Exception,它会将错误传递给观察者的 onError 方法,不会使用备用的 Observable。

下面是 onErrorReturn()onErrorResumeNext() 的程序示例,这里第一段代码会在出现错误的时候输出 666,而第二段会在出现错误的时候发射数字 12345

Observable.create((ObservableOnSubscribe<Integer>) observableEmitter -> {
    observableEmitter.onError(null);
    observableEmitter.onNext(0);
}).onErrorReturn(throwable -> 666).subscribe(System.out::print);

Observable.create((ObservableOnSubscribe<Integer>) observableEmitter -> {
    observableEmitter.onError(null);
    observableEmitter.onNext(0);
}).onErrorResumeNext(Observable.range(1,5)).subscribe(System.out::print);

2.retry

retry() 使用了一种错误重试机制,它可以在出现错误的时候进行重试,我们可以通过参数指定重试机制的条件。以下面的程序为例,这里我们设置了当出现错误的时候会进行 2 次重试,因此,第一次的时候出现错误会调用 onNext(),重试2次又会调用 2 次 onNext(),第二次重试的时候因为重试又出现了错误,因此此时会触发 onError() 方法。也就是说,下面这段代码会触发 onNext() 3 次,触发 onError() 1 次:

Observable.create(((ObservableOnSubscribe<Integer>) emitter -> {
    emitter.onNext(0);
    emitter.onError(new Throwable("Error1"));
    emitter.onError(new Throwable("Error2"));
})).retry(2).subscribe(i -> System.out.println("onNext : " + i), error -> System.out.print("onError : " + error));

retry() 有几个重载的方法和功能相近的方法,下面是这些方法的定义(选取部分):

1.public final Observable<T> retry():会进行无限次地重试;
2.public final Observable<T> retry(BiPredicate<? super Integer, ? super Throwable> predicate)
3.public final Observable<T> retry(long times):指定重试次数;
4.public final Observable<T> retry(long times, Predicate<? super Throwable> predicate)
5.public final Observable<T> retryUntil(final BooleanSupplier stop)
6.public final Observable<T> retryWhen(Function<? super Observable<Throwable>, ? extends ObservableSource<?>> handler)

2.1.7 条件操作符和布尔操作符

1.all & any

1.all() 用来判断指定的数据项是否全部满足指定的要求,这里的“要求”可以使用一个函数来指定;
2.any() 用来判断指定的 Observable 是否存在满足指定要求的数据项。

在下面的程序中,我们用该函数来判断指定的数据项是否全部满足大于 5 的要求,显然是不满足的,因此下面的程序将会输出 false

Observable.range(55).all(i -> i>5).subscribe(System.out::println); // false
Observable.range(55).any(i -> i>5).subscribe(System.out::println); // true

以下是该方法的定义:

1.public final Single<Boolean> all(Predicate<? super T> predicate)
2.public final Single<Boolean> any(Predicate<? super T> predicate)

2.contains & isEmpty

这两个方法分别用来判断数据项中是否包含我们指定的数据项,已经判断数据项是否为空:

Observable.range(55).contains(4).subscribe(System.out::println); // false
Observable.range(55).isEmpty().subscribe(System.out::println); // false

以下是这两个方法的定义:

1.public final Single<Boolean> isEmpty()
2.public final Single<Boolean> contains(final Object element)

3.sequenceEqual

sequenceEqual() 用来判断两个 Observable 发射出的序列是否是相等的。比如下面的方法用来判断两个序列是否相等:

Observable.sequenceEqual(Observable.range(1,5), Observable.range(15)).subscribe(System.out::println);

4.amb

amb() 作用的两个或多个 Observable,但是只会发射最先发射数据的那个 Observable 的全部数据:

Observable.amb(Arrays.asList(Observable.range(15), Observable.range(65))).subscribe(System.out::print)

该方法及其功能近似的方法的定义,这里前两个是静态的方法,第二个属于实例方法:

1.public static <T> Observable<T> amb(Iterable<? extends ObservableSource<? extends T>> sources)
2.public static <T> Observable<T> ambArray(ObservableSource<? extends T>... sources)
3.public final Observable<T> ambWith(ObservableSource<? extends T> other)

5.defaultIfEmpty

defaultIfEmpty() 用来当指定的序列为空的时候指定一个用于发射的值。下面的程序中,我们直接调用发射器的 onComplete() 方法,因此序列是空的,结果输出一个整数 6

Observable.create((ObservableOnSubscribe<Integer>) Emitter::onComplete).defaultIfEmpty(6).subscribe(System.out::print);

下面是该方法的定义:

1.public final Observable<T> defaultIfEmpty(T defaultItem)

2.1.8 转换操作符

1.toList & toSortedList

toList()toSortedList() 用于将序列转换成列表,后者相对于前者增加了排序的功能:

Observable.range(15).toList().subscribe(System.out::println);
Observable.range(15).toSortedList(Comparator.comparingInt(o -> -o)).subscribe(System.out::println);

下面是它们的定义,它们有多个重载版本,这里选择其中的两个进行说明:

1.public final Single<List<T>> toList()
2.public final Single<List<T>> toSortedList(final Comparator<? super T> comparator)

注意一下,这里的返回结果是 Single 类型的,不过这并不妨碍我们继续使用链式操作,因为 Single 的方法和 Observable 基本一致。

另外还要注意这里的 Single 中的参数是一个 List<T>,也就是说,它把整个序列转换成了一个列表对象。因此,上面的两个示例程序的输出是:

[12345]
[54321]

2.toMap & toMultimap

toMap() 用于将发射的数据转换成另一个类型的值,它的转换过程是针对每一个数据项的。以下面的代码为例,它会将原始的序列中的每个数字转换成对应的十六进制。但是,toMap() 转换的结果不一定是按照原始的序列的发射的顺序来的:

Observable.range(810).toMap(Integer::toHexString).subscribe(System.out::print);

toMap() 近似的是 toMultimap() 方法,它可以将原始序列的每个数据项转换成一个集合类型:

Observable.range(810).toMultimap(Integer::toHexString).subscribe(System.out::print);

上面的两段程序的输出结果是:

{11=17, a=10, b=11, c=12, d=13, e=14, f=158=89=910=16}
{11=[17], a=[10], b=[11], c=[12], d=[13], e=[14], f=[15], 8=[8], 9=[9], 10=[16]}

上面的两个方法的定义是(多个重载,选择部分):

1.public final <K> Single<Map<K, T>> toMap(final Function<? super T, ? extends K> keySelector)
2.public final <K> Single<Map<K, Collection<T>>> toMultimap(Function<? super T, ? extends K> keySelector)

3.toFlowable

该方法用于将一个 Observable 转换成 Flowable 类型,下面是该方法的定义,显然这个方法使用了策略模式,这里面涉及背压相关的内容,我们后续再详细介绍。

public final Flowable<T> toFlowable(BackpressureStrategy strategy)

4.to

相比于上面的方法,to() 方法的限制更加得宽泛,你可以将指定的 Observable 转换成任意你想要的类型(如果你可以做到的话)。下面是一个示例代码,用来将指定的整数序列转换成另一个整数类型的 Observable,只不过这里的每个数据项都是原来的列表中的数据总数的值:

Observable.range(15).to(Observable::count).subscribe(System.out::println);

下面是该方法的定义:

public final <R> R to(Function<? super Observable<T>, R> converter)

2.2 线程控制

之前有提到过 RxJava 的线程控制是通过 subscribeOn()observeOn() 两个方法来完成的。这里我们梳理一下 RxJava 提供的几种线程调度器以及 RxAndroid 为 Android 提供的调度器的使用场景和区别等。

1.Schedulers.io():代表适用于 io 操作的调度器,增长或缩减来自适应的线程池,通常用于网络、读写文件等 io 密集型的操作。重点需要注意的是线程池是无限制的,大量的 I/O 调度操作将创建许多个线程并占用内存。
2.Schedulers.computation():计算工作默认的调度器,代表 CPU 计算密集型的操作,与 I/O 操作无关。它也是许多 RxJava 方法,比如 buffer(),debounce(),delay(),interval(),sample(),skip(),的默认调度器。
3.Schedulers.newThread():代表一个常规的新线程。
4.Schedulers.immediate():这个调度器允许你立即在当前线程执行你指定的工作。它是 timeout(),timeInterval() 以及 timestamp() 方法默认的调度器。
5.Schedulers.trampoline():当我们想在当前线程执行一个任务时,并不是立即,我们可以用 trampoline() 将它入队。这个调度器将会处理它的队列并且按序运行队列中每一个任务。它是 repeat()retry() 方法默认的调度器。

以及 RxAndroid 提供的线程调度器:

AndroidSchedulers.mainThread() 用来指代 Android 的主线程

2.3 总结

上面的这些操作也基本适用于 Flowable、Single、Completable 和 Maybe。

我们花费了很多的时间和精力来梳理了这些方法,按照上面的内容,使用 RxJava 实现一些基本的或者高级的操作都不是什么问题。

但是,Observable 更适用于处理一些数据规模较小的问题,当数据规模比较多的时候可能会出现 MissingBackpressureException 异常。因此,我们还需要了解背压和 Flowable 的相关内容才能更好地理解和应用 RxJava.

最后

这是 [响应式编程] 系列的第二部分,完整系列文章如下:

  • [RxJava 响应式编程] 奉上一篇全面的 RxJava2 方法总结·下

  • [RxJava 响应式编程] 使用 RxJava 的 Flowable 和背压功能

  • [RxJava 响应式编程] RxJava 的实际应用示范,用 RxJava 打造 EventBus

  • [RxJava 响应式编程] 最硬核,RxJava2 源码和原理分析

公众号信息

非常感谢您的关注~


以上是关于[RxJava 响应式编程] 奉上一篇全面的 RxJava2 方法总结·下的主要内容,如果未能解决你的问题,请参考以下文章

Rx系列---响应式编程

响应式编程

一文带你全面了解RxJava

响应式编程入门(RxJava)

RxJava原理解析

响应式编程实战——新版RxJS实现真正双击事件流