RXJAVA-concatMap

Posted 征服.刘华强

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RXJAVA-concatMap相关的知识,希望对你有一定的参考价值。

与flatMap类似,concatMap它和flatMap的作用几乎一模一样, 只是它的结果是严格按照上游发送的顺序来发送的, 来看个代码吧:

package com.netty.demo.vertx;

import io.reactivex.*;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import io.reactivex.schedulers.Schedulers;
import lombok.extern.slf4j.Slf4j;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.TimeUnit;

@Slf4j
public class RxJavaTest 

    public static void main(String[] args) throws InterruptedException 

        Observable.create(new ObservableOnSubscribe<Integer>() 
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception 
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
            
        ).observeOn(Schedulers.newThread()).subscribeOn(Schedulers.io())
        .concatMap(new Function<Integer, ObservableSource<String>>() 
            @Override
            public ObservableSource<String> apply(Integer integer) throws Exception 
                final List<String> list = new ArrayList<>();
                for (int i = 0; i < 3; i++) 
                    list.add("I am value " + integer);
                
                log.info(Arrays.toString(list.toArray()));
                return Observable.fromIterable(list).delay(10, TimeUnit.MILLISECONDS);
            
        ).subscribe(new Consumer<String>() 
            @Override
            public void accept(String s) throws Exception 
                log.info(s);
            
        );

        Thread.sleep(300000000);
    

2021-02-22 15:46:02 [RxNewThreadScheduler-1] INFO  c.n.d.v.RxJavaTest:apply - [I am value 1, I am value 1, I am value 1]
2021-02-22 15:46:02 [RxComputationThreadPool-1] INFO  c.n.d.v.RxJavaTest:accept - I am value 1
2021-02-22 15:46:02 [RxComputationThreadPool-1] INFO  c.n.d.v.RxJavaTest:accept - I am value 1
2021-02-22 15:46:02 [RxComputationThreadPool-1] INFO  c.n.d.v.RxJavaTest:accept - I am value 1
2021-02-22 15:46:02 [RxComputationThreadPool-1] INFO  c.n.d.v.RxJavaTest:apply - [I am value 2, I am value 2, I am value 2]
2021-02-22 15:46:02 [RxComputationThreadPool-2] INFO  c.n.d.v.RxJavaTest:accept - I am value 2
2021-02-22 15:46:02 [RxComputationThreadPool-2] INFO  c.n.d.v.RxJavaTest:accept - I am value 2
2021-02-22 15:46:02 [RxComputationThreadPool-2] INFO  c.n.d.v.RxJavaTest:accept - I am value 2
2021-02-22 15:46:02 [RxComputationThreadPool-2] INFO  c.n.d.v.RxJavaTest:apply - [I am value 3, I am value 3, I am value 3]
2021-02-22 15:46:02 [RxComputationThreadPool-3] INFO  c.n.d.v.RxJavaTest:accept - I am value 3
2021-02-22 15:46:02 [RxComputationThreadPool-3] INFO  c.n.d.v.RxJavaTest:accept - I am value 3
2021-02-22 15:46:02 [RxComputationThreadPool-3] INFO  c.n.d.v.RxJavaTest:accept - I am value 3

以上是关于RXJAVA-concatMap的主要内容,如果未能解决你的问题,请参考以下文章

RXJAVA-concatMap