是否可以并行启动 Mono 并汇总结果

Posted

技术标签:

【中文标题】是否可以并行启动 Mono 并汇总结果【英文标题】:Is it possible to start Mono's in parallel and aggregate the result 【发布时间】:2018-06-18 17:56:16 【问题描述】:

我知道可以链接 Mono 的,例如,...

Mono<String> resultAMono = loadA();
Mono<String> resultBMono = resultA.flatMap(resultA -> loadB());

这将链接,resultBMono 将在 resultAMono 返回时运行....

所以我的问题是,是否可以并行启动 2 个 Mono,并且当两个返回继续另一个 Mono 时?

我认为它看起来像这样......

Mono<String> resultAMono = loadA();
Mono<String> resuktBMono = loadB();
Mono<Tuple2<Stirng, String> tupleMono = Mono.zip(resultAMono, resultBMono);

但我不知道这将并行运行,或者我可以做些什么来并行运行......

谢谢解答....

【问题讨论】:

【参考方案1】:

2 种语义,1 种使它们并行运行的方法

我在下面介绍的两个选项都需要一些额外的调整以使 A 和 B Mono 并行运行:即,每个 Mono 应该使用 subscribeOn(Scheduler) 来退出它们合并的公共线程.

如果你只关心A和B的完成情况

使用when 监听A 和B 完成,使用then 继续使用完全不同的Mono

Mono.when(monoAwithSubscribeOn, monoBwithSubscribeOn)
    .then(Mono.just("A and B finished, I don't know their value"));

如果您关心 A 值和 B 值

使用zip + map/flatMap,具体取决于您要对结果做什么。

Mono.zip(monoAwithSubscribeOn, monoBwithSubscribeOn)
    .map(tuple2 -> new Foo(tuple2.getT1(), tuple2.getT2(), "bar");

Mono.zip(monoAwithSubscribeOn, monoBwithSubscribeOn)
    .flatMap(tuple2 -> fetchMoreDataAsMono(tuple2.getT1(), tuple2.getT2()));

then 会忽略之前的数据,所以在它之前使用zip 没有多大意义。

另外,如果 A 或 B 之一为空,zip 将导致 empty Mono 使用switchIfEmpty/defaultIfEmpty 来防止这种情况。

【讨论】:

++ subscribeOn(Scheduler) 提示! 用于并行执行myMono.subscribeOn(Schedulers.boundedElastic());

以上是关于是否可以并行启动 Mono 并汇总结果的主要内容,如果未能解决你的问题,请参考以下文章

如何启动并行协程并返回结果

并行运行两个异步任务并在 .NET 4.5 中收集结果

oracle使用并行踩过的坑

并行运行函数并使用队列获取输出

使用 Reactive 并行处理 List<Mono<Object>>

Fork/Join框架详解