我可以运行 2 个 Mono 异步 Reactor Core 吗?
Posted
技术标签:
【中文标题】我可以运行 2 个 Mono 异步 Reactor Core 吗?【英文标题】:Can I run 2 Mono asynchronous Reactor Core? 【发布时间】:2018-11-07 07:57:16 【问题描述】:在java中:
Mono<response> response = mon.just()
Mono<object> object = mono.just()
return response.block()
响应和对象不相互依赖。 有没有办法同时并行运行 2 个单声道?
【问题讨论】:
【参考方案1】:方法不止一种。一种简单的解决方案是使用subscribeOn
运算符:
Mono<response> response = Mono.just(...).subscribeOn(Schedulers.boundedElastic());
Mono<object> object = Mono.just(...).subscribeOn(Schedulers.boundedElastic());
每当有人订阅您的 Monos 时,它都会发生在另一个线程中。在这种情况下,线程取自现有池。 Schedulers
提供了多种方法,允许您使用现有线程或创建新线程(请参阅documentation)。
如果您对反应式流和多线程感兴趣,我最近写了一封 article 来讨论它。
【讨论】:
我在 Spring Boot 应用程序(版本2.4.1
)上与 Mono.zip(...).block()
并行运行了一些测试,有和没有 subscribeOn(Schedulers
:与顺序相比,并行执行时间几乎减半,添加subscribeOn(Schedulers
没有任何区别。这可能是因为 Spring Boot 自动将 Reactor Netty 配置为默认服务器,而默认服务器又默认使用弹性调度程序线程。【参考方案2】:
如果这 2 个Mono
s 没有任何关联,并且您想并行运行它们,那么我建议您考虑一下您的设计。
如果您想并行运行它们并在两个结果都可用时使用它们的结果:
Mono<Integer> source1 = Mono.just(1).subscribeOn(Schedulers.elastic());
Mono<String> source2 = Mono.just("aaaa").subscribeOn(Schedulers.elastic());
Mono.zip(source1, source2, (integer, string) -> string.concat(integer.toString()))
.subscribe(x -> System.out.println(x));
输出:
"aaaa1"
如果结果类型相同(但不是必需的),那么您可以这样做:
Mono<Integer> source1 = Mono.just(1).subscribeOn(Schedulers.elastic());
Mono<Integer> source2 = Mono.just(2).subscribeOn(Schedulers.elastic());
Flux.merge(source1, source2)
.map(number -> number * 10)
.subscribe(x -> System.out.println(x));
请注意,您不能指望哪个元素首先可用,因此我们使用 Flux
表示 2 个结果,而在第一个示例中,我们使用 Mono
表示一个包含 2 个结果的结果。
输出:
10
20
or
20
10
More information about Scheduler
class, available Schedulers in reactor lib and subscribeOn operation.
【讨论】:
以上是关于我可以运行 2 个 Mono 异步 Reactor Core 吗?的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 Reactor 的 StepVerifier 来验证 Mono 是不是为空?