像 flatMap() 这样在同一主线程上运行的 Reactor 运算符是不是比常规阻塞代码更有效?

Posted

技术标签:

【中文标题】像 flatMap() 这样在同一主线程上运行的 Reactor 运算符是不是比常规阻塞代码更有效?【英文标题】:Are Reactor operators like flatMap() that runs on same main thread any more efficient than regular blocking code?像 flatMap() 这样在同一主线程上运行的 Reactor 运算符是否比常规阻塞代码更有效? 【发布时间】:2021-12-17 13:12:40 【问题描述】:

我了解许多反应器运算符(如 flatMap())在调用 onNext 方法的同一线程上运行。我想了解的是,这种方法是否比 for 循环中的常规阻塞调用更有效/性能更高。抱歉,如果这是一个菜鸟问题,但我似乎无法理解。反应式的威力只有在我们使用跳转线程的Scheduler(例如Scheduler.parallel())等情况下才能实现吗?

例如如果我有如下功能

List<Integer> integers = Arrays.asList(5,6,7,8,9,10);
    Flux.fromIterable(integers)
            .flatMap(this::makeAReactiveMethodCall)
            .subscribe(r -> log.info(Thread.currentThread().getName()));

日志看起来像这样 - 请注意所有线程都是相同的“主”线程。

01:25:40.641 INFO ReactiveTest - main
01:25:40.642 INFO ReactiveTest - main
01:25:40.642 INFO ReactiveTest - main
01:25:40.642 INFO ReactiveTest - main
01:25:40.642 INFO ReactiveTest - main
01:25:40.642 INFO ReactiveTest - main

所有调用都发生在同一个主线程上。这段代码如何比在具有阻塞语义的 for 循环中进行所有调用更有效?说出下面的代码?

integers.forEach(i -> this.makeAReactiveMethodCall(i).block());

【问题讨论】:

【参考方案1】:

假设每个makeAReactiveMethodCall 执行一些 I/O 并需要 1 秒才能完成。使用flatMap 运算符,您的调用将异步进行。这意味着主线程将在不等待 I/O 操作完成(非阻塞)的情况下进行所有 6 次调用,相反,它会处理一些其他工作,并在调用完成时收到通知。在WebClient 和 Project Reactor 的情况下,这是通过使用 Netty 事件循环来排队/调度/处理事件来实现的。

在传统的阻塞方式中(例如RestTemplate),同步进行所有 6 个调用需要 6 秒。当然,您可以使用ExecutorService API 使其异步,但在这种情况下,您将需要 6 个线程,因为调用会阻塞。反应式模型的优点之一是线程数量有限,因此在创建多个线程时不会浪费资源。

【讨论】:

谢谢。因此,即使不需要额外的线程,主线程也可以并行进行 6 次调用?我想这是我无法掌握的。这怎么可能,是否有其他线程正在监听来自这 6 个调用的响应?谁产生了那个线程? @abhishekmishra 我已经更新了第一段。 所以 Project reactor 总是引入 netty?我想我可以自己检查一下。想知道为什么这部分在文档中没有明确说明还有其他一些工作人员在做家务 - 事件循环。 使用 Spring WebFlux 可以切换到另一个嵌入式容器,例如 Tomcat 及其异步 Servlet 3.1 API,但需要 spring-boot-starter-reactor-netty 才能使用 WebClient。见docs.spring.io/spring-boot/docs/2.1.5.RELEASE/reference/html/… 是的。所以我的理解是,任何促进对另一个系统进行反应性调用的库都会带来包含 EventLoop 实现的依赖项。如果您建议我们以反应方式进行 API 调用,我们可以使用 Spring WebFlux,它会引入提供 EventLoop 实现的 netty。那是对的吗?这样,只需执行flatMap(),我们就不会引入 EventLoop,但是当从系统发出实际的响应式调用时,EventLoop 就会出现。【参考方案2】:

这不是 makeReactiveMethodCall() 正在做 CPU 密集型工作,或者它根本不是真正的反应,而是伪装的阻塞调用。

makeReactiveMethodCall 引入一些延迟时效率更高,例如。通过以反应方式执行 I/O。

对于您可能要考虑的各种处理步骤,还需要在组合和使用统一抽象方面进行权衡。

但如果您追求的是 CPU 密集型代码的纯吞吐量,那么请务必使用良好的旧循环。

【讨论】:

谢谢。我了解处理步骤的组合和统一抽象的好处。我没有得到的是,当我的底层反应式管道仅使用一个主线程时,当方法 makeReactiveMethodCall 以反应式方式执行 I/O 时,它如何更有效?

以上是关于像 flatMap() 这样在同一主线程上运行的 Reactor 运算符是不是比常规阻塞代码更有效?的主要内容,如果未能解决你的问题,请参考以下文章

如何在 RxScala/Java 中使用多个线程执行 map、filter、flatMap?

Python多线程是啥意思?

在同一个后台线程上运行 FMDB 进程?

这样学 Python 多线程与进程

我可以假设计划在串行队列上运行的块都将在同一个线程上运行吗?

有关子线程对UI的线程更新的说法