Reactive Stream: 如何将两个数据流接到一起,然后进行操作

Posted 小曲isme

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Reactive Stream: 如何将两个数据流接到一起,然后进行操作相关的知识,希望对你有一定的参考价值。

如何将两个数据流接到一起,然后进行操作

Flux是Project Reactor中的概念。

一个需求

我有两个数据流的源头,想要把他们合并到一起 然后组合成一个新流去返回。

思路一

我将两个flux流转化为mono,在其中一个流中进行一个flatMap操作,然后将两个流连接到一起。

看代码

@Test
void name() {
    final Flux<Integer> oddSource = Flux.fromIterable(Arrays.asList(1, 3, 5, 7, 9));
    final Flux<Integer> evenSource = Flux.fromIterable(Arrays.asList(2, 4, 6, 8, 10));
    final List<Integer> totalList = oddSource.collectList()
        .flatMap(oddList -> evenSource.collectList().map(evenList -> {
            evenList.addAll(oddList);
            return evenList;
        })).block();
    assertThat(totalList)
        .containsExactly(2, 4, 6, 8, 10, 1, 3, 5, 7, 9)
        .size()
        .isEqualTo(10);

}

思路二

感觉这个是骚操作。并且我流中的数据顺序并不是我期望的那样:第一个流中的数据,应该在前面
于是找到了另外一个操作符concatWith,并且保序是我需要的

看代码

@Test
void name1() {
    final Flux<Integer> oddSource = Flux.fromIterable(Arrays.asList(1, 3, 5, 7, 9));
    final Flux<Integer> evenSource = Flux.fromIterable(Arrays.asList(2, 4, 6, 8, 10));
    final List<Integer> totalList = oddSource.concatWith(evenSource)
        .collectList().block();

    assertThat(totalList)
        .containsExactly(1, 3, 5, 7, 9, 2, 4, 6, 8, 10)
        .size()
        .isEqualTo(10);

}

后边发现需求又变了:

我有四个流,然后要将这个四个流合并到一起,然后去返回。

三种方法

  • 将flux换成mono,然后多进行几次flatMap操作
  • 多次调用flux的实例方法concatWith
  • 调用flux的静态方法concat

方法一和方法二大致和前边两种写法一致,

方法三
@Test
void name2() {
    final Flux<Integer> oddSource = Flux.fromIterable(Arrays.asList(1, 3, 5, 7, 9));
    final Flux<Integer> evenSource = Flux.fromIterable(Arrays.asList(2, 4, 6, 8, 10));
    final List<Integer> totalList = Flux.concat(oddSource,evenSource)
        .collectList().block();

    assertThat(totalList)
        .containsExactly(1, 3, 5, 7, 9, 2, 4, 6, 8, 10)
        .size()
        .isEqualTo(10); 

}

Flux.concat方法接收的是比变长参数就不展示了,可以看下该方法的源码

该方法接受的是publisher类型的参数,所以Mono的source也可以接上来

代码

@Test
void test3() {
    final Flux<Integer> oddSource = Flux.fromIterable(Arrays.asList(1, 3, 5, 7, 9));
    final Flux<Integer> evenSource = Flux.fromIterable(Arrays.asList(2, 4, 6, 8, 10));
    final Mono<Integer> monoSource = Mono.just(11);
    final List<Integer> totalList = Flux.concat(oddSource,evenSource, monoSource)
        .collectList().block();

    assertThat(totalList)
        .containsExactly(1, 3, 5, 7, 9, 2, 4, 6, 8, 10,11)
        .size()
        .isEqualTo(11);

}

其实这种reactive stream 和java的stream有很多相似之处,java如何将两个stream连接到一起java 8提供了Stream.concat()一样的思想,java 8 的stream用的多了,一些流里操作符也就很容易对应到Reactive Stream 上了。

Source reop: https://github.com/1483523635/blogs/blob/master/java/reactive-streaming/flux.md

以上是关于Reactive Stream: 如何将两个数据流接到一起,然后进行操作的主要内容,如果未能解决你的问题,请参考以下文章

FunDA- Stream Source:reactive data streams

JDK9新特性 Reactive Stream 响应式流

Java9响应式编程Reactive Stream

失败时优雅地重启 Reactive-Kafka Consumer Stream

Vue 3 之:弄清 ref reactive toRef toRefs

vue3.0 reactive函数