在 Spring Webflux 中执行阻塞 JDBC 调用

Posted

技术标签:

【中文标题】在 Spring Webflux 中执行阻塞 JDBC 调用【英文标题】:Execute blocking JDBC call in Spring Webflux 【发布时间】:2019-07-18 17:34:17 【问题描述】:

我正在使用 Spring Webflux 和 Spring data jpa,使用 PostgreSql 作为后端数据库。 我不想在进行findsave 之类的数据库调用时阻塞主线程。 为了达到同样的效果,我在Controller 类和jdbcScheduler 服务类中有一个主调度程序。

我定义它们的方式是:

@Configuration
@EnableJpaAuditing
public class CommonConfig 

    @Value("$spring.datasource.hikari.maximum-pool-size")
    int connectionPoolSize;

    @Bean
    public Scheduler scheduler() 
        return Schedulers.parallel();
    

    @Bean
    public Scheduler jdbcScheduler() 
        return Schedulers.fromExecutor(Executors.newFixedThreadPool(connectionPoolSize));
    

    @Bean
    public TransactionTemplate transactionTemplate(PlatformTransactionManager transactionManager) 
        return new TransactionTemplate(transactionManager);
    

现在,在我的服务层中执行 get/save 调用时,我会这样做:

    @Override
    public Mono<Config> getConfigByKey(String key) 
        return Mono.defer(
            () -> Mono.justOrEmpty(configRepository.findByKey(key)))
            .subscribeOn(jdbcScheduler)
            .publishOn(scheduler);
    

    @Override
    public Flux<Config> getAllConfigsAfterAppVersion(int appVersion) 
        return Flux
            .fromIterable(configRepository.findAllByMinAppVersionIsGreaterThanEqual(appVersion))
            .subscribeOn(jdbcScheduler)
            .publishOn(scheduler);
    

    @Override
    public Flux<Config> addConfigs(List<Config> configList) 
        return Flux.fromIterable(configRepository.saveAll(configList))
            .subscribeOn(jdbcScheduler)
            .publishOn(scheduler);
    

在控制器中,我这样做:

    @PostMapping
    @ResponseStatus(HttpStatus.CREATED)
    Mono<ResponseDto<List<Config>>> addConfigs(@Valid @RequestBody List<Config> configs) 
        return configService.addConfigs(configs).collectList()
            .map(configList -> new ResponseDto<>(HttpStatus.CREATED.value(), configList, null))
            .subscribeOn(scheduler);
    

这是正确的吗?和/或有更好的方法吗?

我的理解是:

.subscribeOn(jdbcScheduler)
.publishOn(scheduler);

该任务将在jdbcScheduler 线程上运行,稍后的结果将在我的主并行scheduler 上发布。这种理解正确吗?

【问题讨论】:

【参考方案1】:

您对publishOnsubscribeOn (see reference documentation in the reactor project about those operators) 的理解是正确的。

如果您在没有调度特定调度程序的情况下调用阻塞库,这些调用将阻塞少数可用线程之一(默认情况下,Netty 事件循环),您的应用程序将只能同时处理少数请求。

现在我不确定你这样做是为了达到什么目的。

首先,parallel scheduler is designed for CPU bound tasks,这意味着您将拥有与 CPU 内核一样多(或更多)的数量。在这种情况下,这就像将您的线程池大小设置为常规 Servlet 容器上的内核数。您的应用将无法处理大量并发请求。

即使您选择了更好的替代方案(如弹性调度程序),它仍然不如 Netty 事件循环,后者是在 Spring WebFlux 中本地调度请求处理的地方。

如果您的最终目标是性能和可扩展性,那么将阻塞调用封装在响应式应用中的性能可能会比您的常规 Servlet 容器更差。

您可以改用 Spring MVC 并且:

在处理阻塞库(如 JPA)时使用通常的阻塞返回类型 当您未绑定到此类库时,请使用 MonoFlux 返回类型

这不会是非阻塞的,但这仍然是异步的,您将能够并行执行更多工作而无需处理复杂性。

【讨论】:

嗨,Brian,感谢您的快速回复和准确的解释。我假设我的所有主要执行都将发生在主调度程序(在我的情况下是并行的)上,并且 jdbc 任务将被卸载到另一个 jdbcScheduler 而不阻塞我的主线程以响应调用。我从您的回答中了解到: • 我不应该在控制器层上使用订阅,而是将其留给 Netty 事件循环。做这样的事情的想法来自这里:dzone.com/articles/… 另外,如果可能的话,有人可以告诉我如果我不在我的客户端 -> 控制器 -> 服务 -> 存储库 -> 控制器 -> 客户端调用的任何地方添加 subscribeOn 会发生什么。(这里,客户端是外部的)它似乎在我的本地工作正常,这对高吞吐量网络服务器有什么影响? 如果您省略了publishOn,那么事情将在您在订阅中配置的调度程序上运行。所以不,网络循环上不会再发生事情了。 M 建议是使用 Spring MVC,因为它支持响应式返回类型。它可能比 WebFlux 包装阻塞调用表现更好 我完全不同意 如果您的最终目标是性能和可扩展性,那么在响应式应用程序中封装阻塞调用可能比常规 Servlet 容器的性能更差乙>。在响应式应用程序中包装阻塞类仍然比基于线程的 Servlet 容器更好,因为您只在阻塞调用而不是所有内容上使用基于线程的样式(线程池或 Servlet 容器)。【参考方案2】:

恕我直言,有一种方法可以更好地利用机器资源来执行此操作。按照文档,您可以在其他线程中wrap the call,然后您可以继续执行。

【讨论】:

以上是关于在 Spring Webflux 中执行阻塞 JDBC 调用的主要内容,如果未能解决你的问题,请参考以下文章

深入剖析 Spring WebFlux

Spring WebFlux 入门

在 Spring Webflux 中结合非阻塞和阻塞调用并返回结果

Spring之WebFlux

如何在不阻塞的情况下从 Spring Webflux 中的 Mono 对象中提取数据?

你的响应阻塞了没有?--Spring-WebFlux源码分析