是否可以并行启动 Mono 并汇总结果
Posted
技术标签:
【中文标题】是否可以并行启动 Mono 并汇总结果【英文标题】:Is it possible to start Mono's in parallel and aggregate the result 【发布时间】:2018-06-18 17:56:16 【问题描述】:我知道可以链接 Mono 的,例如,...
Mono<String> resultAMono = loadA();
Mono<String> resultBMono = resultA.flatMap(resultA -> loadB());
这将链接,resultBMono 将在 resultAMono 返回时运行....
所以我的问题是,是否可以并行启动 2 个 Mono,并且当两个返回继续另一个 Mono 时?
我认为它看起来像这样......
Mono<String> resultAMono = loadA();
Mono<String> resuktBMono = loadB();
Mono<Tuple2<Stirng, String> tupleMono = Mono.zip(resultAMono, resultBMono);
但我不知道这将并行运行,或者我可以做些什么来并行运行......
谢谢解答....
【问题讨论】:
【参考方案1】:2 种语义,1 种使它们并行运行的方法
我在下面介绍的两个选项都需要一些额外的调整以使 A 和 B Mono
并行运行:即,每个 Mono
应该使用 subscribeOn(Scheduler)
来退出它们合并的公共线程.
如果你只关心A和B的完成情况
使用when
监听A 和B 完成,使用then
继续使用完全不同的Mono
:
Mono.when(monoAwithSubscribeOn, monoBwithSubscribeOn)
.then(Mono.just("A and B finished, I don't know their value"));
如果您关心 A 值和 B 值
使用zip
+ map
/flatMap
,具体取决于您要对结果做什么。
Mono.zip(monoAwithSubscribeOn, monoBwithSubscribeOn)
.map(tuple2 -> new Foo(tuple2.getT1(), tuple2.getT2(), "bar");
或
Mono.zip(monoAwithSubscribeOn, monoBwithSubscribeOn)
.flatMap(tuple2 -> fetchMoreDataAsMono(tuple2.getT1(), tuple2.getT2()));
then
会忽略之前的数据,所以在它之前使用zip
没有多大意义。
另外,如果 A 或 B 之一为空,zip
将导致 empty Mono
!
使用switchIfEmpty
/defaultIfEmpty
来防止这种情况。
【讨论】:
++subscribeOn(Scheduler)
提示!
用于并行执行myMono.subscribeOn(Schedulers.boundedElastic());
以上是关于是否可以并行启动 Mono 并汇总结果的主要内容,如果未能解决你的问题,请参考以下文章