RX-JAVA distinct debounce defer merge

Posted 征服.刘华强

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RX-JAVA distinct debounce defer merge相关的知识,希望对你有一定的参考价值。

distinct 去重

        Observable.just(1, 1, 1, 2, 2, 3, 4, 5)
                .distinct()
                .subscribe(new Consumer<Integer>() 
                    @Override
                    public void accept(Integer integer) throws Exception 
                        log.info("distinct : " + integer);
                    
                );
        Thread.sleep(300000000);

debounce 去除发送频率过快的项,2个发送项的间隔要大于500毫秒,比如1发完后400毫秒发送2,那么1不会被发送到下游,  2发送完后505毫秒发送3,那么2会发送到下游。

 Observable.create(new ObservableOnSubscribe<Integer>() 
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception 
                // send events with simulated time wait
                emitter.onNext(1); // skip
                Thread.sleep(400);
                emitter.onNext(2); // deliver
                Thread.sleep(505);
                emitter.onNext(3); // skip
                Thread.sleep(100);
                emitter.onNext(4); // deliver
                Thread.sleep(605);
                emitter.onNext(5); // deliver
                Thread.sleep(510);
                emitter.onComplete();
            
        ).debounce(500, TimeUnit.MILLISECONDS)
                .subscribeOn(Schedulers.io())
                .observeOn(Schedulers.newThread())
                .subscribe(new Consumer<Integer>() 
                    @Override
                    public void accept(Integer integer) throws Exception 
                        log.info("debounce :" + integer);
                    
                );
        Thread.sleep(300000000);

 

defer 简单地时候就是每次订阅都会创建一个新的 Observable,并且如果没有被订阅,就不会产生新的 Observable

Observable<Integer> observable = Observable.defer(new Callable<ObservableSource<Integer>>() 
            @Override
            public ObservableSource<Integer> call() throws Exception 
                log.info("call -create observable");
                return Observable.just(1, 2, 3);
            
        );

        observable.subscribe(new Consumer<Integer>() 
            @Override
            public void accept(Integer integer) throws Exception 
                log.info("accpent:" + integer);
            
        );

        observable.subscribe(new Consumer<Integer>() 
            @Override
            public void accept(Integer integer) throws Exception 
                log.info("accpent:" + integer);
            
        );
2021-02-24 10:53:24 [main] INFO  c.n.d.v.RxJavaTest:call - call -create observable
2021-02-24 10:53:24 [main] INFO  c.n.d.v.RxJavaTest:accept - accpent:1
2021-02-24 10:53:24 [main] INFO  c.n.d.v.RxJavaTest:accept - accpent:2
2021-02-24 10:53:24 [main] INFO  c.n.d.v.RxJavaTest:accept - accpent:3
2021-02-24 10:53:24 [main] INFO  c.n.d.v.RxJavaTest:call - call -create observable
2021-02-24 10:53:24 [main] INFO  c.n.d.v.RxJavaTest:accept - accpent:1
2021-02-24 10:53:24 [main] INFO  c.n.d.v.RxJavaTest:accept - accpent:2
2021-02-24 10:53:24 [main] INFO  c.n.d.v.RxJavaTest:accept - accpent:3

merge 顾名思义,熟悉版本控制工具的你一定不会不知道 merge 命令,而在 Rx 操作符中,merge 的作用是把多个 Observable 结合起来,接受可变参数,也支持迭代器集合。注意它和 concat 的区别在于,不用等到 发射器 A 发送完所有的事件再进行发射器 B 的发送。

        Observable.merge(Observable.just(1, 2, 3, 4), Observable.just(5, 6, 7, 8))
                .subscribeOn(Schedulers.newThread())
                .observeOn(Schedulers.io())
                .subscribe(new Consumer<Integer>() 
                    @Override
                    public void accept(Integer integer) throws Exception 
                        log.info("accept: merge :" + integer);
                    
                );
        Thread.sleep(3000000);

 

 

 

 

 

 

以上是关于RX-JAVA distinct debounce defer merge的主要内容,如果未能解决你的问题,请参考以下文章

RX-JAVA distinct debounce defer merge

如何使用rx-java尾部文件?

Lodash 之 debounce

Flutter: debounce 避免高频率事件

bind, debounce 突然报错 Expect a function 以及debounce 不生效

js 消抖(debounce)与节流(throttle)