RxJava-操作符

Posted jiangyue2780

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RxJava-操作符相关的知识,希望对你有一定的参考价值。

基本定义

  • Observable – 被观察者(事件源)
  • Observer – 观察者

Creating Observables(创建操作)

以下展示了创建Observable的各种方法
  • Create — 通过observer的方法创建一个新的Observable
  • Defer — 直到observer订阅的时候才会创建这个Observable,并且为每一个observer创建一个Observable
  • Empty/Never/Throw — 创建行为受限特殊的Observable
    Empty — 只会发射onCompleted事件
    Never — 不会发射任事件和数据
    Throw — 只会发射onError
  • From — 将现有的一些对象集或数据结构转换到一个Observable里,然后逐个发射
  • Interval — 返回一个Observable,它按固定的时间间隔发射一个无限递增的整数序列。
  • Just — 类似from,但just只是单纯的把传入对象发射出去,from会把列表中数据取出来逐个发射
  • Range — 创建一个发射指定范围内的有序整数序列的Observable
  • Repeat — 指定当前Observable可以重复发射
  • Start — 返回一个Observable,它发射一个类似于函数声明的返回值
  • Timer — 创建一个延时发射的Observable

Transforming Observables(变换操作)

以下是可用于对Observable发射的数据执行变换操作的各种操作符
  • Buffer — 可以理解为缓存,定期从Observable中收集数据放进一个数据包裹,然后把这些数据打包发射,而不是一次发射一个值。
  • FlatMap — 使用一个指定的函数对原始Observable发射的每一项数据执行变换操作,这个函数返回一个本身也发射数据的Observable,然后FlatMap合并这些Observables发射的数据,最后将合并后的结果当做它自己的数据序列发射。
  • GroupBy — 将原始Observable按不同的key分组拆分为一些Observables集合,key相同的数据会被同一个Observable发射。
  • Map — 将Observable发射的数据通过一个函数进行变换,然后返回一个发射这些结果的Observable。
  • Scan — 将发射的数据通过一个函数进行变换,然后将变换后的结果作为参数跟下一个发射的数据一起继续通过那个函数变换,这样依次连续发射得到最终结果
  • Window — 定期将来自原始Observable的数据分解为一个Observable窗口,发射这些窗口,而不是每次发射一项数据,
    Window和Buffer类似,但不是发射来自原始Observable的数据包,它发射的是Observables,这些Observables中的每一个都发射原始Observable数据的一个子集,最后发射一个onCompleted通知。

Filtering Observables(过滤操作)

  • Debounce — 仅在过了一段指定的时间还没发射数据时才发射一个数据,Debounce操作符会过滤掉发射速率过快的数据项,意思就是在指定时间内如果发射过多个数据,在最后只会发射最近的哪次数据。
  • Distinct — 过滤掉重复的数据项
  • ElementAt — 只发射第N项数据
  • Filter — 只发射过滤函数返回true的数据项
  • First — 只发射第一项(或者满足某个条件的第一项)数据
  • IgnoreElements — 不发射任何数据,只发射Observable的终止通知(onError、onCompleted),当你不关心发射数据,只关心成功与否时可以使用该操作符。
  • Last — 只发射最后一项(或者满足某个条件的最后一项)数据
  • Sample — 定期发射Observable最近发射的数据项
  • Skip — 过滤Observable发射的前N项数据
  • SkipLast — 过滤Observable发射的后N项数据
  • Take — 只发射Observable发射的前面的N项数据
  • TakeLast — 只发射Observable发射的最后N项数据

Combining Observables(结合操作)

 以下操作符可用于组合多个Observables。
  • And/Then/When — 使用Pattern和Plan作为中介,将两个或多个Observable发射的数据集合并到一起。
    And/Then/When操作符组合的行为类似于zip,但是它们使用一个中间数据结构。接受两个或多个Observable,一次一个将它们的发射物合并到Pattern对象,然后操作那个Pattern对象,变换为一个Plan。随后将这些Plan变换为Observable的发射物。
  • CombineLatest — 当两个Observables中的任何一个发射了数据时,使用一个函数结合每个Observable发射的最近数据项,并且基于这个函数的结果发射数据,多个同理。
  • Join — 任何时候,只要在另一个Observable发射的数据定义的时间范围内,这个Observable发射了一条数据,就结合两个Observable发射的数据;换句话说,在其中一个Observable发射数据时,只要另一个Observable发射的数据还没过期(会有2个函数定义各自的有效时间窗口),就结合2个Observable发射的数据。
  • Merge — 可以将多个Observables的输出合并,就好像它们是一个单个的Observable一样。
  • StartWith — 在发射数据最前面添加发射数据项。
  • Switch — 将一个发射多个Observables的Observable转换成另一个单独的Observable,后者发射最近一个被发射的Observable的发射物,意味着在最近被发射的Observable之前被发射的Observable的发射物都将被丢弃。
    通俗点讲,先前发射Observables中A的发射物,切换后就开始发B的发射物了。。
  • Zip — 通过一个函数合并多个Observable数据发射项,基于这个函数结果为每个合并项发射单个数据项。

Error Handling Operators(错误处理)

很多操作符可用于对Observable发射的onError通知做出响应或者从错误中恢复,例如,你可以:

  1. 吞掉这个错误,切换到一个备用的Observable继续发射数据
  2. 吞掉这个错误然后发射默认值
  3. 吞掉这个错误并立即尝试重启这个Observable
  4. 吞掉这个错误,在一些回退间隔后重启这个Observable

这是操作符列表:

Catch:

  • onErrorResumeNext — 指示Observable在遇到错误时发射一个数据序列
  • onErrorReturn — 指示Observable在遇到错误时发射一个特定的数据
  • onExceptionResumeNext — instructs an Observable to continue
    emitting items after it encounters an exception (but not another
    variety of throwable)指示Observable遇到错误时继续发射数据

Retry:

  • retry — 指示Observable遇到错误时重试
  • retryWhen — 指示Observable遇到错误时,将错误传递给另一个Observable来决定是否要重新给订阅这个Observable

Observable Utility Operators(辅助操作)

  • Delay — 延迟一段指定的时间再发射来自Observable的发射物
  • Do — 相当于注册一个跟Observable某一个生命周期相关的一个回调方法,比如doOnNext、doOnError等等。
  • Materialize — Materialize将数据项和事件通知都当做数据项封装成Notification对象发射,Dematerialize刚好相反。
  • ObserveOn — 指定一个观察者在哪个调度器上观察这个Observable,RxJava上调度器就是不同线程。
  • Serialize — 强制一个Observable连续调用并保证行为正确,保证行为正确是指调用观察者的事件方法顺序正确调用,因为我们的观察方法可能是从不同线程异步调用的,就可能会导致行为不正确,使用serialize可以纠正。
  • Subscribe — 操作来自Observable的发射物和通知;
    Subscribe操作符是连接观察者和Observable的胶水。一个观察者要想看到Observable发射的数据项,或者想要从Observable获取错误和完成通知,它首先必须使用这个操作符订阅那个Observable。
    Subscribe操作符的一般实现可能会接受一到三个方法(然后由观察者组合它们),或者接受一个实现了包含这三个方法的接口的对象(有时叫做Observer或Subscriber):
    • onNext — 每当Observable发射了一项数据它就会调用这个方法。这个方法的参数是这个Observable发射的数据项。
    • onError — 当Observable无法生成期待的数据或遇到其他问题时会调用此方法,参数是一个Exception或Throwable。
    • onCompleted — 如果没有遇到任何错误,Observable再最后会调用这个方法。
  • SubscribeOn — 指定Observable自身在哪个调度器上执行。
  • TimeInterval — 将一个发射数据的Observable转换为发射那些数据发射时间间隔的Observable,即新Observable只发射旧Observable相邻2个发射物之间的发射时间间隔,第一个时间间隔为原始Observable刚开始订阅到发射第一个数据项之间的时间间隔。
  • TimeOut — 指定时长内,如果Observable还没有发射数据,则发送一个错误通知,可能是onError,也可能是切换到一个备用Observable,RxJava有几种的实现变体。
  • TimeStamp — 给Observable发射的数据项附加一个时间戳,RxJava中的实现为timestamp,它将一个发射T类型数据的Observable转换为一个发射类型为Timestamped的数据的Observable,每一项都包含数据的原始发射时间。
  • Using — 该操作符让你可以指示Observable创建一个只在它的生命周期内存在的资源,当Observable终止时这个资源会被自动释放。
    using操作符接受三个参数:

    1. 一个用户创建一次性资源的工厂函数
    2. 一个用于创建Observable的工厂函数
    3. 一个用于释放资源的函数

    当一个观察者订阅using返回的Observable时,using将会使用Observable工厂函数创建观察者要观察的Observable,同时使用资源工厂函数创建一个你想要创建的资源。当观察者取消订阅这个Observable时,或者当观察者终止时(无论是正常终止还是因错误而终止),using使用第三个函数释放它创建的资源。

  • To — 将Observable转换为另一个对象或数据结构。
    该操作符让你可以将Observable或者Observable发射的数据序列转换为另一个对象或数据结构,然后返回一个发射那个对象或数据结构的Observable。

Conditional and Boolean Operators(条件和布尔操作)

以下操作符可用于根据条件发射或变换Observables,或者对它们做布尔运算。
  • All — 提供一个函数判断一个Observable的所有发射数据项是否满足某个条件。
  • Amb — 给定两个或多个Observables,它只发射首先发射数据或通知的那个Observable的所有数据,丢弃其他Observable发射的数据。
  • Contains — 判定一个Observable是否发射一个特定的值。
  • DefaultIfEmpty — 发射来自原始Observable的值,如果原始Observable没有发射任何值,就发射一个默认值。
  • SequenceEqual — 传递两个Observable给SequenceEqual操作符,它会比较两个Observable的发射物,如果两个序列是相同的(相同的数据,相同的顺序,相同的终止状态),它就发射true,否则发射false。
  • SkipUtil — 丢弃原始Observable发射的数据,直到第二个Observable发射了一项数据。
    SkipUntil订阅原始的Observable,但是忽略它的发射物,直到第二个Observable发射了一项数据那一刻,它开始发射原始Observable。
  • SkipWhile — 丢弃Observable发射的数据,直到一个指定的条件不成立。
    SkipWhile订阅原始的Observable,但是忽略它的发射物,直到你指定的某个条件变为false的那一刻,它开始发射原始Observable。
  • TakeUtil — 当第二个Observable发射了一项数据或者终止时,丢弃原始Observable后面发射的任何数据。
    TakeUntil订阅并开始发射原始Observable,它还监视你提供的第二个Observable。如果第二个Observable发射了一项数据或者发射了一个终止通知,TakeUntil返回的Observable会停止发射原始Observable并终止。
  • TakeWhile — 发射Observable发射的数据,直到一个指定的条件不成立。
    TakeWhile发射原始Observable,直到你指定的某个条件不成立的那一刻,它停止发射原始Observable,并终止自己的Observable。

Mathematical and Aggregate Operators(算术和聚合操作)

下面的操作符用于对整个序列执行算法操作或其它操作,由于这些操作必须等待数据发射完成(通常也必须缓存这些数据),它们对于非常长或者无限的序列来说是危险的,不推荐使用。
  • Average — 计算原始Observable发射数字的平均值并发射它。
  • Concat — 不交错的发射两个或多个Observable的发射物,即Observable是一个一个执行发射的,第一个发射完毕第二个Observable才会开始发射。
  • Count — 计算原始Observable发射物的数量,然后只发射这个值。
  • Max — 发射原始Observable的最大值。
  • Min — 发射原始Observable的最小值。
  • Reduce — 按顺序对Observable发射的每项数据应用一个函数并发射最终的值。
  • Sum — 计算Observable发射的数值的和并发射这个和。

Connectable Observable Operators(连接操作)

  • Publish — 将普通的Observable转换为可连接的Observable。
    可连接的Observable (connectable Observable)与普通的Observable差不多,不过它并不会在被订阅时开始发射数据,而是直到使用了Connect操作符时才会开始。用这个方法,你可以等待所有的观察者都订阅了Observable之后再开始发射数据。
  • Connect — 让一个可连接的Observable开始发射数据给订阅者。
    RxJava中connect是ConnectableObservable接口的一个方法。
  • RefCount — 让一个可连接的Observable行为像普通的Observable。
    RefCount操作符把从一个可连接的Observable连接和断开的过程自动化了。它操作一个可连接的Observable,返回一个普通的Observable。当第一个订阅者订阅这个Observable时,RefCount连接到下层的可连接Observable。RefCount跟踪有多少个观察者订阅它,直到最后一个观察者完成才断开与下层可连接Observable的连接。
  • Replay — 保证所有的观察者收到相同的数据序列,即使它们在Observable开始发射数据之后才订阅。

常用的操作符基本都在上面了,当然还有海量的操作符需要平时用到的时候去积累了~

以上是关于RxJava-操作符的主要内容,如果未能解决你的问题,请参考以下文章

使用 RxJava 将数据从 ViewModel 移动到 Fragment

使用 rxJava 和改造多次调用另一个请求中的请求

RxJava编程思想2-(实现简易版Rxjava,如何实现操作符?)

RxJava编程思想2-(实现简易版Rxjava,如何实现操作符?)

Android 勤用RXJava compose操作符消除重复代码

RxJava操作符系列一(上)