我可以运行 2 个 Mono 异步 Reactor Core 吗?

Posted

技术标签:

【中文标题】我可以运行 2 个 Mono 异步 Reactor Core 吗?【英文标题】:Can I run 2 Mono asynchronous Reactor Core? 【发布时间】:2018-11-07 07:57:16 【问题描述】:

在java中:

    Mono<response> response = mon.just()
    Mono<object> object = mono.just()
    return response.block()

响应和对象不相互依赖。 有没有办法同时并行运行 2 个单声道?

【问题讨论】:

【参考方案1】:

方法不止一种。一种简单的解决方案是使用subscribeOn 运算符:

Mono<response> response = Mono.just(...).subscribeOn(Schedulers.boundedElastic());
Mono<object> object = Mono.just(...).subscribeOn(Schedulers.boundedElastic());

每当有人订阅您的 Monos 时,它都会发生在另一个线程中。在这种情况下,线程取自现有池。 Schedulers 提供了多种方法,允许您使用现有线程或创建新线程(请参阅documentation)。

如果您对反应式流和多线程感兴趣,我最近写了一封 article 来讨论它。

【讨论】:

我在 Spring Boot 应用程序(版本 2.4.1)上与 Mono.zip(...).block() 并行运行了一些测试,有和没有 subscribeOn(Schedulers:与顺序相比,并行执行时间几乎减半,添加subscribeOn(Schedulers 没有任何区别。这可能是因为 Spring Boot 自动将 Reactor Netty 配置为默认服务器,而默认服务器又默认使用弹性调度程序线程。【参考方案2】:

如果这 2 个Monos 没有任何关联,并且您想并行运行它们,那么我建议您考虑一下您的设计。


如果您想并行运行它们并在两个结果都可用时使用它们的结果:

Mono<Integer> source1 = Mono.just(1).subscribeOn(Schedulers.elastic());
Mono<String> source2 = Mono.just("aaaa").subscribeOn(Schedulers.elastic());

Mono.zip(source1, source2, (integer, string) -> string.concat(integer.toString()))
        .subscribe(x -> System.out.println(x));

输出:

"aaaa1"

如果结果类型相同(但不是必需的),那么您可以这样做:

Mono<Integer> source1 = Mono.just(1).subscribeOn(Schedulers.elastic());
Mono<Integer> source2 = Mono.just(2).subscribeOn(Schedulers.elastic());

Flux.merge(source1, source2)
        .map(number -> number * 10)
        .subscribe(x -> System.out.println(x));

请注意,您不能指望哪个元素首先可用,因此我们使用 Flux 表示 2 个结果,而在第一个示例中,我们使用 Mono 表示一个包含 2 个结果的结果。

输出:

10
20
or
20
10

More information about Scheduler class, available Schedulers in reactor lib and subscribeOn operation.

【讨论】:

以上是关于我可以运行 2 个 Mono 异步 Reactor Core 吗?的主要内容,如果未能解决你的问题,请参考以下文章

Reactor响应式编程(Mono)

学习响应式编程 Reactor - reactor 基础

如何在 Reactor 中使用 Mono 的内容

如何使用 Reactor 的 StepVerifier 来验证 Mono 是不是为空?

5分钟理解 SpringBoot 响应式的核心-Reactor

学习响应式编程 Reactor - reactor 转换类操作符