Android :RxJava学习笔记之合并操作符
Posted JMW1407
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Android :RxJava学习笔记之合并操作符相关的知识,希望对你有一定的参考价值。
RxJava ——合并操作符
合并操作符
组合 多个被观察者(Observable) & 合并需要发送的事件,也就是是将两个或多个被观察者合并为一个被观察者,并向观察者传递事件。
1、concat() / concatArray()
组合多个被观察者一起发送数据,合并后 按发送顺序串行执行
- 二者区别:组合被观察者的数量,即
concat()组合被观察者数量≤4个
,而concatArray()则可>4个
Observable
.concat(Observable.just(1, 2, 3),
Observable.just(4, 5, 6),
Observable.just(7, 8, 9),
Observable.just(10, 11, 12))
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
System.out.println("接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
System.out.println("对Error事件作出响应" );
}
@Override
public void onComplete() {
System.out.println("对Complete事件作出响应");
}
});
输出:
接收到了事件1
接收到了事件2
接收到了事件3
接收到了事件4
接收到了事件5
接收到了事件6
接收到了事件7
接收到了事件8
接收到了事件9
接收到了事件10
接收到了事件11
接收到了事件12
对Complete事件作出响应
// concatArray():组合多个被观察者一起发送数据(可>4个)
// 注:串行执行
Observable.concatArray(Observable.just(1, 2, 3),
Observable.just(4, 5, 6),
Observable.just(7, 8, 9),
Observable.just(10, 11, 12),
Observable.just(13, 14, 15))
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
Log.d(TAG, "接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
Log.d(TAG, "对Error事件作出响应");
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
});
2、merge() / mergeArray()
组合多个被观察者一起发送数据,合并后 按时间线并行执行
二者区别:
- 组合被观察者的数量,即
merge()组合被观察者数量≤4个
,而mergeArray()则可>4个
- 区别上述concat()操作符:同样是组合多个被观察者一起发送数据,但concat()操作符合并后是按发送顺序串行执行
concat
Observable
.concat(
Observable.intervalRange(0, 3, 1, 1, TimeUnit.SECONDS),
// 从0开始发送、共发送3个数据、第1次事件延迟发送时间 = 1s、间隔时间 = 1s
Observable.intervalRange(2, 3, 1, 1, TimeUnit.SECONDS)
// 从2开始发送、共发送3个数据、第1次事件延迟发送时间 = 1s、间隔时间 = 1s
)
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Long value) {
System.out.println("接收到了事件"+ value);
}
@Override
public void onError(Throwable e) {
System.out.println("对Error事件作出响应" );
}
@Override
public void onComplete() {
System.out.println("对Complete事件作出响应" );
}
});
try {
Thread.sleep(8000);
} catch (InterruptedException e) {
e.printStackTrace();
}
输出
接收到了事件0
接收到了事件1
接收到了事件2
接收到了事件2
接收到了事件3
接收到了事件4
对Complete事件作出响应
merge()
Observable
.merge(
Observable.intervalRange(0, 3, 1, 1, TimeUnit.SECONDS),
// 从0开始发送、共发送3个数据、第1次事件延迟发送时间 = 1s、间隔时间 = 1s
Observable.intervalRange(2, 3, 1, 1, TimeUnit.SECONDS)
// 从2开始发送、共发送3个数据、第1次事件延迟发送时间 = 1s、间隔时间 = 1s
)
.subscribe(new Observer<Long>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Long value) {
System.out.println("接收到了事件"+ value);
}
@Override
public void onError(Throwable e) {
System.out.println("对Error事件作出响应" );
}
@Override
public void onComplete() {
System.out.println("对Complete事件作出响应" );
}
});
try {
Thread.sleep(8000);
} catch (InterruptedException e) {
e.printStackTrace();
}
输出
接收到了事件0
接收到了事件2
接收到了事件1
接收到了事件3
接收到了事件2
接收到了事件4
对Complete事件作出响应
3、concatDelayError() / mergeDelayError()
Observable.concat(
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onError(new NullPointerException()); // 发送Error事件,因为无使用concatDelayError,所以第2个Observable将不会发送事件
emitter.onComplete();
}
}),
Observable.just(4, 5, 6))
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
System.out.println("接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
System.out.println("对Error事件作出响应" );
}
@Override
public void onComplete() {
System.out.println("对Complete事件作出响应");
}
});
输出
接收到了事件1
接收到了事件2
接收到了事件3
对Error事件作出响应
Observable.concatArrayDelayError(
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onError(new NullPointerException()); // 发送Error事件,因为无使用concatDelayError,所以第2个Observable将不会发送事件
emitter.onComplete();
}
}),
Observable.just(4, 5, 6))
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
System.out.println("接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
System.out.println("对Error事件作出响应" );
}
@Override
public void onComplete() {
System.out.println("对Complete事件作出响应");
}
});
输出
接收到了事件1
接收到了事件2
接收到了事件3
接收到了事件4
接收到了事件5
接收到了事件6
对Error事件作出响应
4、Zip()
合并 多个被观察者(Observable)发送的事件,生成一个新的事件序列(即组合过后的事件序列),并最终发送
特别注意:
- 事件组合方式 = 严格按照原先事件序列 进行对位合并
- 最终合并的事件数量 = 多个被观察者(Observable)中数量最少的数量
Observable<Integer> observable1 = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
System.out.println("被观察者1发送了事件1");
emitter.onNext(1);
// 为了方便展示效果,所以在发送事件后加入2s的延迟
Thread.sleep(1000);
System.out.println("被观察者1发送了事件2");
emitter.onNext(2);
Thread.sleep(1000);
System.out.println("被观察者1发送了事件3");
emitter.onNext(3);
Thread.sleep(1000);
emitter.onComplete();
}
}).subscribeOn(Schedulers.io()); // 设置被观察者1在工作线程1中工作
Observable<String> observable2 = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
System.out.println("被观察者2发送了事件A");
emitter.onNext("A");
Thread.sleep(1000);
System.out.println("被观察者2发送了事件B");
emitter.onNext("B");
Thread.sleep(1000);
System.out.println("被观察者2发送了事件C");
emitter.onNext("C");
Thread.sleep(1000);
System.out.println("被观察者2发送了事件D");
emitter.onNext("D");
emitter.onComplete();
}
}).subscribeOn(Schedulers.newThread());// 设置被观察者2在工作线程2中工作
// 假设不作线程控制,则该两个被观察者会在同一个线程中工作,即发送事件存在先后顺序,而不是同时发送
// 注:创建BiFunction对象传入的第3个参数 = 合并后数据的数据类型
Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
@Override
public String apply(Integer integer, String string) throws Exception {
return integer + string;
}
}).subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
System.out.println("onSubscribe");
}
@Override
public void onNext(String s) {
System.out.println("接收到了事件"+ s );
}
@Override
public void onError(Throwable e) {
System.out.println("对Error事件作出响应" );
}
@Override
public void onComplete() {
System.out.println("对Complete事件作出响应");
}
});
try {
Thread.sleep(8000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
输出
onSubscribe
被观察者1发送了事件1
被观察者2发送了事件A
接收到了事件1A
被观察者2发送了事件B
被观察者1发送了事件2
接收到了事件2B
被观察者2发送了事件C
被观察者1发送了事件3
接收到了事件3C
被观察者2发送了事件D
对Complete事件作出响应
5、combineLatest()
当两个Observables中的任何一个发送了数据后,将先发送了数据的Observables 的最新(最后)一个数据 与 另外一个Observable发送的每个数据结合,最终基于该函数的结果发送数据
- 与Zip()的区别:
Zip() = 按个数合并
,即1对1合并; CombineLatest() = 按时间合并
,即在同一个时间点上合并
可能上面这张图不是太好理解,可以看下面这张图:
combineLatest()发送事件的序列是与发送的时间线有关的。拿上图解释当发送A之后会从上一个Observ拿最近发送的1进行组合生成‘1A’,当发送2时拿第二个Observable最近发送的数据B组合成‘2B’,接下来到事件C时还是取第一个Observable最近发送的时间2进行组合成‘2C’,以此类推。
Observable.combineLatest(
Observable.just(1L, 2L, 3L), // 第1个发送数据事件的Observable
Observable.intervalRange(0, 3, 1, 1, TimeUnit.SECONDS), // 第2个发送数据事件的Observable:从0开始发送、共发送3个数据、第1次事件延迟发送时间 = 1s、间隔时间 = 1s
new BiFunction<Long, Long, Long>() {
@Override
public Long apply(Long o1, Long o2) throws Exception {
// o1 = 第1个Observable发送的最新(最后)1个数据
// o2 = 第2个Observable发送的每1个数据
System.out.println("合并的数据是: "+ o1 + " "+ o2);
return o1 + o2;
// 合并的逻辑 = 相加
// 即第1个Observable发送的最后1个数据 与 第2个Observable发送的每1个数据进行相加
}
}).subscribe(new Consumer<Long>() {
@Override
public void accept(Long s) throws Exception {
System.out.println("合并的结果是: "+s);
}
});
try {
Thread.sleep(40000);
} catch (InterruptedException e) {
e.printStackTrace();
}
输出
合并的数据是: 3 0
合并的结果是: 3
合并的数据是: 3 1
合并的结果是: 4
合并的数据是: 3 2
合并的结果是: 5
Observable.combineLatest(
Observable.intervalRange(0, 3, 1, 1, TimeUnit.SECONDS), // 第1个发送数据事件的Observable
Observable.intervalRange(0, 3, 1, 1, TimeUnit.SECONDS), // 第2个发送数据事件的Observable:从0开始发送、共发送3个数据、第1次事件延迟发送时间 = 1s、间隔时间 = 1s
new BiFunction<Long, Long, Long>() {
@Override
public Long apply(Long o1, Long o2) throws Exception {
// o1 = 第1个Observable发送的最新(最后)1个数据
// o2 = 第2个Observable发送的每1个数据
System.out.println("合并的数据是: "+ o1 + " "+ o2);
return o1 + o2;
// 合并的逻辑 = 相加
// 即第1个Observable发送的最后1个数据 与 第2个Observable发送的每1个数据进行相加
}
}).subscribe(new Consumer<Long>() {
@Override
public void accept(Long s) throws Exception {
System.out.println("合并的结果是: "+s);
}
});
try {
Thread.sleep(4000);
} catch (InterruptedException e) {
e.printStackTrace();
}
输出
合并的数据是: 0 0
合并的结果是: 0
合并的数据是: 1 0
合并的结果是: 1
合并的数据是: 1 1
合并的结果是: 2
合并的数据是: 2 1
合并的结果是: 3
合并的数据是: 2 2
合并的结果是: 4
0 1 2
0 1 2
0 1 2 3 4
6、startWith() / startWithArray()
在一个被观察者发送事件前,追加发送一些数据 / 一个新的被观察者
Observable.just(4, 5, 6)
.startWith(0) // 追加单个数据 = startWith()
.startWithArray(1, 2, 3) // 追加多个数据 = startWithArray()
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
System.out.println("接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
System.out.println("对Error事件作出响应" );
}
@Override
public void onComplete() {
System.out.println("对Complete事件作出响应");
}
});
输出
接收到了事件1
接收到了事件2
接收到了事件3
接收到了事件0
接收到了事件4
接收到了事件5
接收到了事件6
对Complete事件作出响应
Observable.just(4, 5, 6)
.startWith(Observable.just(1, 2, 3))
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
System.out.println("接收到了事件"+ value );
}
@Override
public void onError(Throwable e) {
System.out.println("对Error事件作出响应" );
}
@Override
public void onComplete() {
System.out.println("对Complete事件作出响应");
}
});
7、count()
统计被观察者发送事件的数量
// 注:返回结果 = Long类型
Observable.just(1, 2, 3, 4)
.count()
.subscribe(new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
System.out.println("发送的事件数量 = "+aLong);
}
});
输出
发送的事件数量 = 4
参考
1、Carson带你学Android:RxJava组合/合并操作符
2、Rxjava2 : 合并操作符
3、Android RxJava2(三)组合操作符
以上是关于Android :RxJava学习笔记之合并操作符的主要内容,如果未能解决你的问题,请参考以下文章