reactor之操作符

Posted aofengdaxia

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了reactor之操作符相关的知识,希望对你有一定的参考价值。

什么是操作符

为了方便对Flux/Mono的数据操作,减少自定义subscriber的代码量,reactor提供了操作符的概念,操作符可以对Flux/Mono的数据进行转换、过滤、合并等操作。操作符的很多名称和概念和java8的Stream API是类似的,比如map、filter、flatMap、reduce等。为了减少学习操作符的成本,建议大家先学习java8的Stream API,然后再学习reactor的操作符。

reactor中操作符比较众多,总得来说可以分为对数据单个元素进行操作,对多个元素进行操作,对整个数据源进行操作,处理错误异常,对整个数据流的生命周期进行操作几大类。
大概的理解什么情况下使用哪一类操作即可,没必要去刻意记忆或者理解每个操作,用到的时候去查询即可。

常见的操作符

简单的操作符

map,filter, flatMap, reduce等常见的流操作

map,filter,flatMap,reduce等操作和java 8的流操作类似,这里不再赘述。

Flux<Integer> flux=Flux.just(1,2,3)
        .map(i->i*2)
        .map(i->i+1);
Flux.just("a","b","c").filter(s->s.equals("a"))
//flatMap
Flux.just("a","b","c").flatMap(s->Flux.just(s.toUpperCase()))
//reduce
Flux.just(1,2,3).reduce(0,(a,b)->a+b)

合并类操作符

zipWith

zipWith 操作符可以将两个流合并成一个流,合并的方式是将两个流中的元素按照顺序一一对应,然后将两个元素组合成一个元素。如果两个流的长度不一致,那么最终合并成的流的长度就是两个流中长度较短的那个流的长度。

Flux.zip(Flux.just("a","b","c"),Flux.just("d","e","f"))
        .subscribe(System.out::println);
Flux.just("a","b","c").zipWith(Flux.just("d","e","f"))
        .subscribe(System.out::println);
// 上面两种方法等同
//输出为:[a,d] [b,e] [c,f]

mergeWith

mergeWith 操作符可以将两个流合并成一个流,合并的方式是将两个流中的元素交替地放入到合并后的流中。同时运行,根据时间先后运行。

Flux.merge(Flux.just("a","b","c"),Flux.just("d","e","f"))
        .subscribe(System.out::println);
Flux.just("a","b","c").mergeWith(Flux.just("d","e","f")).subscribe(System.out::println);
// 上面两种方法等同
//输出为:a b c d e f

concatWith

concatWith 操作符可以将两个流合并成一个流,合并的方式是将两个流中的元素按照顺序放入到合并后的流中。按照顺序分别运行,flux1运行完成以后再运行flux2。

Flux.concat(Flux.just("a","b","c"),Flux.just("d","e","f"))
        .subscribe(System.out::println);
Flux.just("a","b","c").concatWith(Flux.just("d","e","f")).subscribe(System.out::println);
// 上面两种方法等同
//输出为:a b c d e f

统计和判断类

count, all, any, hasElements

count,all,any,hasElements 这四个操作符都是用来判断流中的元素是否满足某个条件的。和java 8 中的 Stream 中的 count,allMatch,anyMatch,findAny,findFirst 方法类似。

count 操作符可以统计流中元素的个数。
all 操作符可以判断流中的所有元素是否都满足某个条件。
any 操作符可以判断流中是否存在满足某个条件的元素。
hasElements 操作符可以判断流中是否存在元素。

```java
Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
        .count()
        .subscribe(System.out::println);
//输出为:10
Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
                .all(i -> i > 0)
                .subscribe(System.out::println);
//输出为:true
            
Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
                    .any(i -> i > 5)
                    .subscribe(System.out::println);
//输出为:true
                    
Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
                            .hasElements()
                            .subscribe(System.out::println);
//输出为:true
        

取出和跳过元素

take, takeLast, takeUntil, takeWhile

take, takeLast, takeUntil, takeWhile 这四个操作符都是用来从流中取出元素的。和 java 8 中的 Stream 中的 limit,skip,findFirst,findAny 方法类似。
take 操作符可以从流中取出指定个数的元素。
takeLast 操作符可以从流中取出最后指定个数的元素。
takeUntil 操作符可以从流中取出元素直到满足某个条件。
takeWhile 操作符可以从流中取出元素直到不满足某个条件。

Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
        .take(5)
        .subscribe(System.out::println);
//输出为:1 2 3 4 5
Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
                    .takeLast(5)
                    .subscribe(System.out::println);
//输出为:6 7 8 9 10   
Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
                    .takeUntil(i -> i > 5)
                    .subscribe(System.out::println);
//输出为:1 2 3 4 5 6
Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
                        .takeWhile(i -> i < 5)
                        .subscribe(System.out::println);
//输出为:1 2 3 4

skip, skipLast, skipUntil, skipWhile

skip, skipLast, skipUntil, skipWhile 这四个操作符都是用来跳过流中的元素的。和 java 8 中的 Stream 中的 limit,skip,findFirst,findAny 方法类似。
skip 操作符可以跳过流中指定个数的元素。
skipLast 操作符可以跳过流中最后指定个数的元素。
skipUntil 操作符可以跳过流中元素直到满足某个条件。
skipWhile 操作符可以跳过流中元素直到不满足某个条件。

Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
        .skip(5)
        .subscribe(System.out::println);
//输出为:6 7 8 9 10
Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
                    .skipLast(5)
                    .subscribe(System.out::println);
//输出为:1 2 3 4 5
Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
                        .skipUntil(i -> i > 5)
                        .subscribe(System.out::println);
//输出为:6 7 8 9 10
Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
                                .skipWhile(i -> i < 5)
                                .subscribe(System.out::println);        
//输出为:5 6 7 8 9 10

去重操作

distinct, distinctUntilChanged

distinct, distinctUntilChanged 这两个操作符都是用来去重的。和 java 8 中的 Stream 中的 distinct 方法类似。
distinct 操作符可以去除流中重复的元素。
distinctUntilChanged 操作符可以去除流中连续重复的元素。

Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 2, 3, 4, 5)
        .distinct()
        .subscribe(System.out::println);
//输出为:1 2 3 4 5 6 7 8 9 10
Flux.just(1, 2, 2, 3, 3, 4, 5, 5, 6, 7, 8, 9, 10)
                        .distinctUntilChanged()
                        .subscribe(System.out::println);
//输出为:1 2 3 4 5 6 7 8 9 10

转化为collection或Map的操作

collectList

collectList 操作符可以将流中的元素收集到一个 List 中。

Flux.just("a","b","c").collectList().subscribe(System.out::println);
//输出为:[a, b, c]

collectMap

collectMap 操作符可以将流中的元素收集到一个 Map 中。

Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
        .collectMap(i -> i % 2 == 0 ? "even" : "odd", i -> i)
        .subscribe(System.out::println);
//输出为:odd= 9, even=10

collectSortedList

collectSortedList 操作符可以将流中的元素收集到一个有序的 List 中。

Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
        .collectSortedList()
        .subscribe(System.out::println);
//输出为:[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]

collectMultimap

collectMultimap 操作符可以将流中的元素收集到一个 Multimap 中。

Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
        .collectMultimap(i -> i % 2 == 0 ? "even" : "odd")
        .subscribe(System.out::println);
//输出为:odd=[1, 3, 5, 7, 9], even=[2, 4, 6, 8, 10]

对空值的处理

defaultIfEmpty

defaultIfEmpty 操作符可以在流中没有元素时发出一个默认值。

Flux.empty()
        .defaultIfEmpty("default")
        .subscribe(System.out::println);
//输出为:default

switchIfEmpty

switchIfEmpty 操作符可以在流中没有元素时切换到另一个流。

Flux.empty()
        .switchIfEmpty(Flux.just(1, 2, 3))
        .subscribe(System.out::println);
//输出为:1 2 3

对错误的处理

onErrorReturn

onErrorReturn 操作符可以在流中发生错误时发出一个默认值。

Flux.just(1, 2, 3)
        .concatWith(Mono.error(new RuntimeException()))
        .onErrorReturn(0)
        .subscribe(System.out::println);
//输出为:1 2 3 0

onErrorResume

onErrorResume 操作符可以在流中发生错误时切换到另一个流。

Flux.just(1, 2, 3)
        .concatWith(Mono.error(new RuntimeException()))
        .onErrorResume(e -> Flux.just(4, 5, 6))
        .subscribe(System.out::println);
//输出为:1 2 3 4 5 6

onErrorContinue

onErrorContinue 操作符可以在流中发生错误时继续发出流中的元素。

Flux.just(1, 2, 3)
        .concatWith(Mono.error(new RuntimeException()))
        .onErrorContinue((e, o) -> System.out.println("error: " + e.getMessage() + ", object: " + o))
        .subscribe(System.out::println);
//输出为:1 2 3 error: java.lang.RuntimeException, object: 4

retry

retry 操作符可以在流中发生错误时重试。

Flux.just(1, 2, 3)
        .concatWith(Mono.error(new RuntimeException()))
        .retry(1)
        .subscribe(System.out::println);
//输出为:1 2 3 1 2 3

retryWhen

retryWhen 操作符可以在流中发生错误时根据指定的条件重试。

Flux.just(1, 2, 3)
        .concatWith(Mono.error(new RuntimeException()))
        .retryWhen(Retry.fixedDelay(2, Duration.ofSeconds(1)))
        .subscribe(System.out::println);
//输出为:1 2 3 1 2 3 1 2 3

timeout

timeout 操作符可以在流中发生超时时发出一个默认值。

Flux.just(1, 2, 3)
        .delayElements(Duration.ofSeconds(1))
        .timeout(Duration.ofMillis(500), Mono.just(0))
        .subscribe(System.out::println);
//输出为:1 2 3 0

timeoutTo

timeoutTo 操作符可以在流中发生超时时切换到另一个流。

Flux.just(1, 2, 3)
        .delayElements(Duration.ofSeconds(1))
        .timeout(Duration.ofMillis(500), Flux.just(JavaScrip之BOMday0914

高性能 I/O 设计模式之Reactor

Java面试题之谈谈reactor模型

附2:Reactor 3 之选择合适的操作符——响应式Spring的道法术器

reactor之hooks

IO设计模式之Reactor和Proactor