项目 Reactor 中的并行通量与通量

Posted

技术标签:

【中文标题】项目 Reactor 中的并行通量与通量【英文标题】:Parallel Flux vs Flux in project Reactor 【发布时间】:2021-11-16 11:44:49 【问题描述】:

所以我从文档中了解到的是,并行通量本质上是将通量元素划分为单独的轨道。(基本上类似于分组)。就线程而言,这将是调度程序的工作。所以让我们考虑这样的情况。所有这些都将在通过 runOn() 方法提供的同一个调度程序实例上运行。 让我们考虑如下情况:

Mono<Response> = webClientCallAPi(..) //function returning Mono from webclient call

现在假设我们打了大约 100 个电话

Flux.range(0,100).subscribeOn(Schedulers.boundedElastic()).flatMap(i -> webClientCallApi(i)).collecttoList() // or subscribe somehow

如果我们使用 paralleFlux:

Flux.range(0,100).parallel().runOn(Schedulers.boundedElastic()).flatMap(i -> webClientCallApi(i)).sequential().collecttoList();

所以如果我的理解是正确的,它似乎很相似。那么 ParallelFlux 相对于 Flux 的优势是什么?什么时候应该使用 parallelFlux 而不是 Flux?

【问题讨论】:

【参考方案1】:

实际上,您可能很少需要使用平行助焊剂,包括在本例中。

在您的示例中,您启动了 100 个 Web 服务调用。请记住,执行此操作所需的实际 工作 非常低 - 您生成并触发异步请求,然后一段时间后您会收到回复。在该请求和响应之间,您根本没有做任何工作,发送每个请求时只需要 tiny 数量的 CPU 资源,而另一个 tiny 关于何时发送收到每个响应。 (这是使用异步框架发出 Web 请求的核心优势之一,在请求进行时您不会占用任何线程。)

如果您拆分此通量并并行运行,您的意思是希望拆分这些微量的 CPU 资源,以便它们可以在不同的 CPU 内核上同时运行。这绝对没有意义 - 拆分通量、并行运行它然后稍后组合它的开销将远远大于仅让它在正常的顺序调度程序上执行。

另一方面,假设我有一个Flux&lt;Integer&gt;,并且我想检查这些整数中的每一个是否是素数,或者可能是我想检查 BCrypt 哈希的密码的Flux&lt;String&gt;。这些类型的操作真正是 CPU 密集型的,因此在这种情况下,用于在内核之间拆分执行的并行通量可能很有意义。但实际上,这些情况在正常的反应器用例中很少发生。

(另外,作为结束说明,您几乎总是希望将Schedulers.parallel() 与平行通量一起使用,而不是Schedulers.boundedElastic()。)

【讨论】:

完全同意在反应堆场景中。但我的观点是决定并行运行的是调度程序正确(我的意思是真正创建线程和类似的东西)。但是,如果您执行 Flux.range(1,100).publishOn(Schedulers.parallel()) 之类的操作,它只会并行分配事物,对吗?我的意思是为什么要通过引入 ParalleFlux 来创建平行轨道(或者我称之为“分组”),如果这可以通过通量来完成? subscribeOn 和 publishOn 使用来自指定发布者的单个线程,不会并发 @DeekshithAnand “如果你做类似 Flux.range(1,100).publishOn(Schedulers.parallel()) 的事情,它只是并行分配事情对吗?” - 一点都不。那永远不能并行执行通量任务,因为底层的非并行通量一次只会为它们提供一个。您看到 Web 请求同时执行的事实并不是并行性的证明(而是并发性的证明。) 啊..我看到并发和并行性之间存在细微差别......这很清楚。太棒了!

以上是关于项目 Reactor 中的并行通量与通量的主要内容,如果未能解决你的问题,请参考以下文章

Project Reactor:仅在未发出第一项时通量超时

如果通量为空,如何发出过滤掉的错误

刷新反应组件或通量/减少中的逻辑?

关于多功能应用程序的状态与通量存储的问题

React.js - 通量与全局事件总线

React (JSX) 中的子级到父级通信没有通量