RXJAVA-concat
Posted 征服.刘华强
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RXJAVA-concat相关的知识,希望对你有一定的参考价值。
concat操作符可以连接俩个
Observable,只有第一个Observable调用了onComplete后,才会触发第二个Observable。
比如在读取数据时,先查询缓存,缓存存在直接处理,不存在查询数据库,然后在处理。
package com.netty.demo.vertx;
import io.reactivex.*;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import io.reactivex.schedulers.Schedulers;
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Subscriber;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;
@Slf4j
public class RxJavaTest
public static void main(String[] args) throws InterruptedException
Observable o1 = Observable.create(new ObservableOnSubscribe<Integer>()
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception
//查询缓存数据
Integer cacheData = Integer.MAX_VALUE;
if (cacheData != null)
//缓存不为空则直接传递给观察者
emitter.onNext(cacheData);
else
//缓存为空则调用onComplete,触发第二个(o2)的执行逻辑
emitter.onComplete();
);
Observable o2 = Observable.create(new ObservableOnSubscribe<Integer>()
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception
//查询数据库
Integer cacheData = Integer.MAX_VALUE;
//传递给观察者
emitter.onNext(cacheData);
);
Observable.concat(o1, o2).subscribe(new Observer<Integer>()
@Override
public void onSubscribe(Disposable d)
log.info("onSubscribe");
@Override
public void onNext(Integer o)
log.info(o.toString());
@Override
public void onError(Throwable e)
log.info("onError");
@Override
public void onComplete()
log.info("onComplete");
);
Thread.sleep(300000000);
以上是关于RXJAVA-concat的主要内容,如果未能解决你的问题,请参考以下文章