RxJava操作符 __结合操作
Posted microhex
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RxJava操作符 __结合操作相关的知识,希望对你有一定的参考价值。
本次学习的内容是关于多个Observable之间的组合,比较有用和有趣。
两年前,我写了
RxJava操作符(一) __创建操作
RxJava操作符(二) __变换操作
RxJava操作符(三) __过滤操作
今天项目里需要用到一些操作符,顺便学习了一下。
startWith
在数据序列开头插入一条指定的项
比较好理解啊,看代码就知道了:
Observable.
range(1, 5).
startWith(-1).
subscribe(getObserver()) ;
看图结果:
我们看到了-1被插入到了最前面,那么startWith就是这个意思。
concatWith
当然了,既然有了startWith,那么就一定有endWith啊,我也不知道为啥RxJava开发组的人员不写个endWith,而使用了concatWith这个名词,不过也说得过来吧,concat有组合串联的意思,更有绵延不断的感觉,所以concatWith比endWith描述的更充分吧。concatWith接受的参数是能产生序列的Observable,看代码:
Observable.
range(1, 2).
concatWith(Observable.just(100,1000,10000)).
concatWith(Observable.range(20000,4)).
subscribe(getObserver()) ;
看看我们的结果:
Merge
用过svn和git的童靴,一定知道Merge,就是合并你我的代码。同样在这里也是合并多个Observable的发射物。多个Observable通过merge发射,可能是按照顺序的,也有可能是交错进行的,这一点是zip操作符有区别的。
Merge方法是静态的,所以它存在多种写法:
Observable<String> source1 = Observable.just("one","two","three");
Observable<String> source2 = Observable.just("1","2","3");
//通过静态方法调用 //Observable.merge(source1,source2).subscribe(getObserver());
//通过对象方法调用 source1.mergeWith(source2).subscribe(getObserver());
所得的结果为:
可以看到我们的发射结果是按照顺序进行的,下面就来个交替发射的,这就要通过我们的多线程去实现了,先定义一个随机休眠的Thread,并返回Observable:
public Observable<String> getRandomObservable(int index)
return Observable.create(new OnSubscribe<String>()
@Override
public void call(Subscriber<? super String> t)
new Thread()
public void run()
for(int i = 0 ; i< 5 ; i++)
try
//随机休眠
sleep(mRandom.nextInt(200) + 10);
//休眠完成之后,进行发射
t.onNext(index + "_" + i);
catch (Exception e)
e.printStackTrace();
t.onCompleted();
;
.start();
);
然后使用merge进行操作:
@Test
public void mergeFunction2() throws IOException
Observable.
merge(getRandomObservable(1) , getRandomObservable(2)).
subscribe(getObserver());
System.in.read();
由于使用的是JavaProject,所以这里使用了System.in.read()来阻塞主线程,等待我们的子线程休眠后进行的各种操作,那么我们的结果为:
显然在图中我们可以看出位于不同的Observable发射的数据是交替进行的。
mergeDelayError
mergeDelayError来自merge,在多个Observable进行merge的时候,如果没有发生异常,那么它的效果与merge是一样的,但是如果Observable在发射的过程中,出现了error情况,merge与mergeDelayError处理情况是不一致的:merge会终止所有的Observable的发射,并以onError结尾;mergeDelayError会保存这个错误,等待其它Observable发射完成之后,在最后才发射错误信息。
处理一下我们的getRandomObservable代码:
public Observable<String> getRandomObservable(int index)
return Observable.create(new OnSubscribe<String>()
@Override
public void call(Subscriber<? super String> t)
new Thread()
public void run()
for(int i = 0 ; i< 5 ; i++)
try
sleep(mRandom.nextInt(200) + 10);
//这里使用index作为除数,
t.onNext(index + "_" + (i / index));
catch (Exception e)
//e.printStackTrace();
t.onError(e);
t.onCompleted();
;
.start();
);
这里分别使用merge和mergeDelayError进行处理:
Observable.
merge(getRandomObservable(0) , getRandomObservable(2)).
subscribe(getObserver());
Observable.
mergeDelayError(getRandomObservable(0) , getRandomObservable(2)).
subscribe(getObserver());
得出的结果为:
从这里,我们看出了merge与mergeDelayError的区别了吧。。
zip
通过一个函数将多个Observable发射物结合起来,基于这个函数的结果为每个结合体发射单个数据项,那么就意味着发射的数据项的多少取决于所有Observable最少发射项的那个。还有,只有当所有的Observable都发射数据之后,zip操作才会进行,否则会进行阻塞,这一点需要注意。举个例子:
现在每个Observable中都发射三个,看看它们的数据结合体:
List<Observable<Integer>> _list = new ArrayList<Observable<Integer>>();
for(int i = 1 ; i < 4 ; i++)
_list.add(Observable.range(i*10, 3));
Observable.zip(_list,new FuncN<String>()
@Override
public String call(Object... args)
String target = "" ;
for(Object obj : args)
target += "_" + obj.toString() ;
return target;
).
subscribe(getObserver());
来看看结果:
发射了三个,现在来改一下代码:
or(int i = 1 ; i < 4 ; i++)
_list.add(Observable.range(i*10, 5 - i));
再看看结果:
现在只有两项,因为Observable中发射项最少的只有两个。
在项目中,使用的比较频繁的,我觉得就是它了。可能是公司的接口和业务没有很好的衔接,很多时候一张页面要获取不同的接口来展示,加上获取接口不同时间的长短,这个zip函数都能很好的解决我们的问题。当然了,这个治标不治本,最重要的还是后台接口要好,才能使我们的程序更稳定。
#CombineLatest
当两个Observable中任何一个发射了数据时,使用一个函数【即我们的FuncN函数】结合每个Observable发射的最近数据项,并且基于这个函数的结果发射数据。不懂?好吧,起初我也是这么想的,先看看官方给的图吧:
从左到右是时间轴,加上上面的时间线为A,下面的为a,那么可以看到A5线发射,A5发射之后需要等待aD发射,发射完成之后,A5与aD结合,通过funcN函数获取二者的结果,5D并再次发射。
CombineLast操作符类似于zip,但是只有当原始的Observable中的每一个都发射了一条数据时zip才发射数据。CombineLast则在原始的Observable中任意一个发射了数据时,CommLast使用一个函数结合它们最近发射的数据,然后发射这个函数的返回值。
这个就不写例子了,感觉自己的例子都不是特别好,没有一个很好的规律可循,以后找到了再加上吧。
Join
官方定义为:任何时候,只有在另一个Observable发射的数据定义的时间窗口内【这里的时间窗口是我们定义的针对每条数据特定的原则】,这个Observable发射的一条数据,就结合两个Observable发射的数据。
不懂?? 我也是啊,那么就看看join函数吧:
join(Observable<TRight> right,
Func1<T, Observable<TLeftDuration>> leftDurationSelector,
Func1<TRight, Observable<TRightDuration>> rightDurationSelector,
Func2<T, TRight, R> resultSelector)
第一个参数right可以理解为需要组合的Observable,就是我们的目标Observable吧;
第二个参数Func1可以理解为从源Observable发射出来的数据,并返回一个新的Observable,这个Func1定义了自己的数据窗口,说白了就是定义自己的数据是否有效;
第三个参数Func1可以理解为从目标Observable发射出来的数据,并返回一个新的Observable,这个Func2定义了自己的数据窗口,就是定义了自己的数据有效期;
第四个参数Func2可以理解为源新产生的Observable和目标产生的Observable进行合并的结果。
来举个例子,先看代码,先生成一个按时发射的Observable:
private Observable<Integer> getOnTimeObservable(int indexTime)
return Observable.
interval(indexTime, TimeUnit.SECONDS).
take(5).
map(new Func1<Long, Integer>()
@Override
public Integer call(Long t)
return (int) (t - 0) ;
) ;
来看我们的主体函数:
@Test
public void joinFunction() throws IOException
//每隔1秒发射
Observable<Integer> source1 = getOnTimeObservable(1);
//每隔2秒发射
Observable<Integer> source2 = getOnTimeObservable(2);
source1.join(source2,
new Func1<Integer,Observable<Long>>()
@Override
public Observable<Long> call(Integer t)
//每隔1秒检查数据是否有效
return Observable.interval(1, TimeUnit.SECONDS);
,
new Func1<Integer,Observable<Long>>()
@Override
public Observable<Long> call(Integer t)
//每隔3秒检查数据是否有效
return Observable.interval(3, TimeUnit.SECONDS);
, new Func2<Integer,Integer,String>()
@Override
public String call(Integer t1, Integer t2)
//返回数据
return t1 + "__" + t2;
).subscribe(getObserver());
System.in.read();
先看打印的结果,然后再分析一下原因:
switchOnNext
它需要的参数是可以发射一堆Observable
的Observable
,但是它只发射最新Observable
中发射的值,以前的,旧的Observable
中发射的值都将被丢弃。
暂时没有发现好的例子,直接上他人的demo:
Observable.
switchOnNext(Observable.interval(2,TimeUnit.SECONDS).
map(new Func1<Long, Observable<?>>()
@Override
public Observable<?> call(Long aLong)
return Observable.interval(1,TimeUnit.SECONDS);
)).subscribe(new Action1<Object>()
@Override
public void call(Object o)
System.out.println("object:" + o );
);
System.in.read();
说说自己的理解,外层的框架内每隔2s生成一个新的Observable
,每个新的Observable
每隔1s发射一个数字,然后执行程序,结果如下:
大致可以理解为下图:
某个时间段内取最新的Observable
中发射的值,其他发射的值忽略.
以上是关于RxJava操作符 __结合操作的主要内容,如果未能解决你的问题,请参考以下文章