reactor之操作符
Posted aofengdaxia
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了reactor之操作符相关的知识,希望对你有一定的参考价值。
什么是操作符
为了方便对Flux/Mono的数据操作,减少自定义subscriber的代码量,reactor提供了操作符的概念,操作符可以对Flux/Mono的数据进行转换、过滤、合并等操作。操作符的很多名称和概念和java8的Stream API是类似的,比如map、filter、flatMap、reduce等。为了减少学习操作符的成本,建议大家先学习java8的Stream API,然后再学习reactor的操作符。
reactor中操作符比较众多,总得来说可以分为对数据单个元素进行操作,对多个元素进行操作,对整个数据源进行操作,处理错误异常,对整个数据流的生命周期进行操作几大类。
大概的理解什么情况下使用哪一类操作即可,没必要去刻意记忆或者理解每个操作,用到的时候去查询即可。
常见的操作符
简单的操作符
map,filter, flatMap, reduce等常见的流操作
map,filter,flatMap,reduce等操作和java 8的流操作类似,这里不再赘述。
Flux<Integer> flux=Flux.just(1,2,3)
.map(i->i*2)
.map(i->i+1);
Flux.just("a","b","c").filter(s->s.equals("a"))
//flatMap
Flux.just("a","b","c").flatMap(s->Flux.just(s.toUpperCase()))
//reduce
Flux.just(1,2,3).reduce(0,(a,b)->a+b)
合并类操作符
zipWith
zipWith 操作符可以将两个流合并成一个流,合并的方式是将两个流中的元素按照顺序一一对应,然后将两个元素组合成一个元素。如果两个流的长度不一致,那么最终合并成的流的长度就是两个流中长度较短的那个流的长度。
Flux.zip(Flux.just("a","b","c"),Flux.just("d","e","f"))
.subscribe(System.out::println);
Flux.just("a","b","c").zipWith(Flux.just("d","e","f"))
.subscribe(System.out::println);
// 上面两种方法等同
//输出为:[a,d] [b,e] [c,f]
mergeWith
mergeWith 操作符可以将两个流合并成一个流,合并的方式是将两个流中的元素交替地放入到合并后的流中。同时运行,根据时间先后运行。
Flux.merge(Flux.just("a","b","c"),Flux.just("d","e","f"))
.subscribe(System.out::println);
Flux.just("a","b","c").mergeWith(Flux.just("d","e","f")).subscribe(System.out::println);
// 上面两种方法等同
//输出为:a b c d e f
concatWith
concatWith 操作符可以将两个流合并成一个流,合并的方式是将两个流中的元素按照顺序放入到合并后的流中。按照顺序分别运行,flux1运行完成以后再运行flux2。
Flux.concat(Flux.just("a","b","c"),Flux.just("d","e","f"))
.subscribe(System.out::println);
Flux.just("a","b","c").concatWith(Flux.just("d","e","f")).subscribe(System.out::println);
// 上面两种方法等同
//输出为:a b c d e f
统计和判断类
count, all, any, hasElements
count,all,any,hasElements 这四个操作符都是用来判断流中的元素是否满足某个条件的。和java 8 中的 Stream 中的 count,allMatch,anyMatch,findAny,findFirst 方法类似。
count 操作符可以统计流中元素的个数。
all 操作符可以判断流中的所有元素是否都满足某个条件。
any 操作符可以判断流中是否存在满足某个条件的元素。
hasElements 操作符可以判断流中是否存在元素。
```java
Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.count()
.subscribe(System.out::println);
//输出为:10
Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.all(i -> i > 0)
.subscribe(System.out::println);
//输出为:true
Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.any(i -> i > 5)
.subscribe(System.out::println);
//输出为:true
Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.hasElements()
.subscribe(System.out::println);
//输出为:true
取出和跳过元素
take, takeLast, takeUntil, takeWhile
take, takeLast, takeUntil, takeWhile 这四个操作符都是用来从流中取出元素的。和 java 8 中的 Stream 中的 limit,skip,findFirst,findAny 方法类似。
take 操作符可以从流中取出指定个数的元素。
takeLast 操作符可以从流中取出最后指定个数的元素。
takeUntil 操作符可以从流中取出元素直到满足某个条件。
takeWhile 操作符可以从流中取出元素直到不满足某个条件。
Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.take(5)
.subscribe(System.out::println);
//输出为:1 2 3 4 5
Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.takeLast(5)
.subscribe(System.out::println);
//输出为:6 7 8 9 10
Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.takeUntil(i -> i > 5)
.subscribe(System.out::println);
//输出为:1 2 3 4 5 6
Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.takeWhile(i -> i < 5)
.subscribe(System.out::println);
//输出为:1 2 3 4
skip, skipLast, skipUntil, skipWhile
skip, skipLast, skipUntil, skipWhile 这四个操作符都是用来跳过流中的元素的。和 java 8 中的 Stream 中的 limit,skip,findFirst,findAny 方法类似。
skip 操作符可以跳过流中指定个数的元素。
skipLast 操作符可以跳过流中最后指定个数的元素。
skipUntil 操作符可以跳过流中元素直到满足某个条件。
skipWhile 操作符可以跳过流中元素直到不满足某个条件。
Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.skip(5)
.subscribe(System.out::println);
//输出为:6 7 8 9 10
Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.skipLast(5)
.subscribe(System.out::println);
//输出为:1 2 3 4 5
Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.skipUntil(i -> i > 5)
.subscribe(System.out::println);
//输出为:6 7 8 9 10
Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.skipWhile(i -> i < 5)
.subscribe(System.out::println);
//输出为:5 6 7 8 9 10
去重操作
distinct, distinctUntilChanged
distinct, distinctUntilChanged 这两个操作符都是用来去重的。和 java 8 中的 Stream 中的 distinct 方法类似。
distinct 操作符可以去除流中重复的元素。
distinctUntilChanged 操作符可以去除流中连续重复的元素。
Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 1, 2, 3, 4, 5)
.distinct()
.subscribe(System.out::println);
//输出为:1 2 3 4 5 6 7 8 9 10
Flux.just(1, 2, 2, 3, 3, 4, 5, 5, 6, 7, 8, 9, 10)
.distinctUntilChanged()
.subscribe(System.out::println);
//输出为:1 2 3 4 5 6 7 8 9 10
转化为collection或Map的操作
collectList
collectList 操作符可以将流中的元素收集到一个 List 中。
Flux.just("a","b","c").collectList().subscribe(System.out::println);
//输出为:[a, b, c]
collectMap
collectMap 操作符可以将流中的元素收集到一个 Map 中。
Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.collectMap(i -> i % 2 == 0 ? "even" : "odd", i -> i)
.subscribe(System.out::println);
//输出为:odd= 9, even=10
collectSortedList
collectSortedList 操作符可以将流中的元素收集到一个有序的 List 中。
Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.collectSortedList()
.subscribe(System.out::println);
//输出为:[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
collectMultimap
collectMultimap 操作符可以将流中的元素收集到一个 Multimap 中。
Flux.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
.collectMultimap(i -> i % 2 == 0 ? "even" : "odd")
.subscribe(System.out::println);
//输出为:odd=[1, 3, 5, 7, 9], even=[2, 4, 6, 8, 10]
对空值的处理
defaultIfEmpty
defaultIfEmpty 操作符可以在流中没有元素时发出一个默认值。
Flux.empty()
.defaultIfEmpty("default")
.subscribe(System.out::println);
//输出为:default
switchIfEmpty
switchIfEmpty 操作符可以在流中没有元素时切换到另一个流。
Flux.empty()
.switchIfEmpty(Flux.just(1, 2, 3))
.subscribe(System.out::println);
//输出为:1 2 3
对错误的处理
onErrorReturn
onErrorReturn 操作符可以在流中发生错误时发出一个默认值。
Flux.just(1, 2, 3)
.concatWith(Mono.error(new RuntimeException()))
.onErrorReturn(0)
.subscribe(System.out::println);
//输出为:1 2 3 0
onErrorResume
onErrorResume 操作符可以在流中发生错误时切换到另一个流。
Flux.just(1, 2, 3)
.concatWith(Mono.error(new RuntimeException()))
.onErrorResume(e -> Flux.just(4, 5, 6))
.subscribe(System.out::println);
//输出为:1 2 3 4 5 6
onErrorContinue
onErrorContinue 操作符可以在流中发生错误时继续发出流中的元素。
Flux.just(1, 2, 3)
.concatWith(Mono.error(new RuntimeException()))
.onErrorContinue((e, o) -> System.out.println("error: " + e.getMessage() + ", object: " + o))
.subscribe(System.out::println);
//输出为:1 2 3 error: java.lang.RuntimeException, object: 4
retry
retry 操作符可以在流中发生错误时重试。
Flux.just(1, 2, 3)
.concatWith(Mono.error(new RuntimeException()))
.retry(1)
.subscribe(System.out::println);
//输出为:1 2 3 1 2 3
retryWhen
retryWhen 操作符可以在流中发生错误时根据指定的条件重试。
Flux.just(1, 2, 3)
.concatWith(Mono.error(new RuntimeException()))
.retryWhen(Retry.fixedDelay(2, Duration.ofSeconds(1)))
.subscribe(System.out::println);
//输出为:1 2 3 1 2 3 1 2 3
timeout
timeout 操作符可以在流中发生超时时发出一个默认值。
Flux.just(1, 2, 3)
.delayElements(Duration.ofSeconds(1))
.timeout(Duration.ofMillis(500), Mono.just(0))
.subscribe(System.out::println);
//输出为:1 2 3 0
timeoutTo
timeoutTo 操作符可以在流中发生超时时切换到另一个流。
Flux.just(1, 2, 3)
.delayElements(Duration.ofSeconds(1))
.timeout(Duration.ofMillis(500), Flux.just(reactor之操作符