Spring Webflux:如何使用不同的线程进行请求和响应

Posted

技术标签:

【中文标题】Spring Webflux:如何使用不同的线程进行请求和响应【英文标题】:Spring Webflux: How to use different thread for request and response 【发布时间】:2019-12-01 06:37:24 【问题描述】:

我正在使用 Spring Webflux,据我了解,通过使用它,用于接收请求的线程和用于响应的线程应该不同。但是,无论我使用 netty 还是 undertow,我最终都会使用相同的线程。

我的应用是一个简单的带有 mysql DB 的 crud 应用。我用的不是 r2dbc,而是 jdbc 加上 Executor 和 Scheduler。

如下日志所示,请求由线程[XNIO-1 I/O-6]处理,响应由同一线程给出。 通过这个,我假设线程被阻塞,直到数据库操作完成。我怎样才能解决这个问题?

这是日志

2019-07-23 17:49:10.051  INFO 132 --- [           main] org.xnio                                 : XNIO version 3.3.8.Final
2019-07-23 17:49:10.059  INFO 132 --- [           main] org.xnio.nio                             : XNIO NIO Implementation Version 3.3.8.Final
2019-07-23 17:49:10.114  INFO 132 --- [           main] o.s.b.w.e.undertow.UndertowWebServer     : Undertow started on port(s) 8080 (http)
2019-07-23 17:49:10.116  INFO 132 --- [           main] c.n.webflux.demo.WebfluxFunctionalApp    : Started WebfluxFunctionalApp in 1.262 seconds (JVM running for 2.668)
2019-07-23 17:49:10.302 DEBUG 132 --- [   XNIO-1 I/O-6] o.s.w.s.adapter.HttpWebHandlerAdapter    : [4c85975] HTTP GET "/api/findall"
2019-07-23 17:49:10.322 DEBUG 132 --- [   XNIO-1 I/O-6] s.w.r.r.m.a.RequestMappingHandlerMapping : [4c85975] Mapped to public reactor.core.publisher.Mono<java.util.List<com.webflux.demo.model.TypeStatus>> com.webflux.demo.controller.MonitoringController.findAll()
2019-07-23 17:49:10.337 DEBUG 132 --- [   XNIO-1 I/O-6] o.s.w.r.r.m.a.ResponseBodyResultHandler  : Using 'application/json;charset=UTF-8' given [*/*] and supported [application/json;charset=UTF-8, application/*+json;charset=UTF-8, text/event-stream]
2019-07-23 17:49:10.338 DEBUG 132 --- [   XNIO-1 I/O-6] o.s.w.r.r.m.a.ResponseBodyResultHandler  : [4c85975] 0..1 [java.util.List<com.webflux.demo.model.TypeStatus>]
2019-07-23 17:49:10.347  INFO 132 --- [pool-1-thread-1] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Starting...
2019-07-23 17:49:10.785  INFO 132 --- [pool-1-thread-1] com.zaxxer.hikari.HikariDataSource       : HikariPool-1 - Start completed.
2019-07-23 17:49:10.838 DEBUG 132 --- [pool-1-thread-1] org.springframework.web.HttpLogging      : [4c85975] Encoding [[com.webflux.demo.model.TypeStatus@7b4509cb, com.webflux.demo.model.TypeStatus@22676ebe, (truncated)...]
2019-07-23 17:49:10.949 DEBUG 132 --- [   XNIO-1 I/O-6] o.s.w.s.adapter.HttpWebHandlerAdapter    : [4c85975] Completed 200 OK

我的道也是

@Repository
public class TypeStatusJdbcTemplate 
    private JdbcTemplate jdbcTemplate;

    public TypeStatusJdbcTemplate(JdbcTemplate jdbcTemplate) 
        this.jdbcTemplate = jdbcTemplate;
    

    private final static String SQL_FIND_ALL = "select * from `monitoring`.`type_status` limit 3";


    public List<TypeStatus> findAll() 
        return jdbcTemplate.query(SQL_FIND_ALL,
                new TypeStatusMapper());
    

服务是

@Service
public class MonitoringService 
    private final Scheduler scheduler;
    private TypeStatusJdbcTemplate repository;

    public MonitoringService(Scheduler scheduler, TypeStatusJdbcTemplate repository) 
        this.scheduler = scheduler;
        this.repository = repository;
    

    public Mono<List<TypeStatus>> findAll() 
        return Mono.fromCallable(repository::findAll).subscribeOn(scheduler);
    


控制器是

@RestController
@RequestMapping("/api")
public class MonitoringController 
    private final MonitoringService monitoringService;
    private static final Logger logger = LoggerFactory.getLogger(MonitoringController.class);

    public MonitoringController(MonitoringService monitoringService) 
        this.monitoringService = monitoringService;
    

    @GetMapping(value="/findall")
    public Mono<List<TypeStatus>> findAll() 
        return monitoringService.findAll();
    

主文件(显示调度器)

@SpringBootApplication
public class WebfluxFunctionalApp 
    public static void main(String[] args)
        SpringApplication.run(WebfluxFunctionalApp.class, args);
    


    @PostConstruct
    public void init()
        // Setting Spring Boot SetTimeZone
        TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
    


    @Bean
    public Scheduler jdbcScheduler() 
        return Schedulers.fromExecutor(Executors.newFixedThreadPool(30));
    

【问题讨论】:

自动连接Scheduler 的确切类型是什么? 无论如何,通过使用subscribeOn,所有阻塞的数据库工作都被委托给另一个线程,这就是XNIO-1 I/O-6 不会被阻塞的原因。除非您不自动装配 netty 调度程序;) 我添加了显示调度程序的代码。我不太明白你说的第二条评论是什么意思。我希望 XNIO-1 I/O-6 不会被阻塞。但是,我认为它被阻塞了,因为它也用于处理请求和响应,而不是另一个线程承担这项工作。 也许你对“应该”的理解太严格了……试试更多的负载! ...而且:我看到(单声道)从“池-”线程中记录! 尝试在Mono.fromCallable(()-&gt;) 块中放置类似System.out.println("callable thread: "+Thread.currentThread().getName()); 的内容。您将看到类似callable thread: pool-1-thread-1 的内容。它表明请求由 XNIO-1 线程处理,然后将控制权从线程池传递给线程,当响应准备好时,将其传递回 NIO-1。我认为这几乎可以回答您想要的问题 - XNIO-1 没有被阻塞,因为所有阻塞工作都在另一个线程上执行,而 XNIO-1 仅用于处理请求和响应。 【参考方案1】:

线程执行并不总是必须是不同的线程。取自 Reactive 文档:

Reactive Schedulers

获得 Flux 或 Mono 并不一定意味着它将在专用线程中运行。相反,大多数运算符继续在前一个运算符执行的线程中工作。除非指定,否则最顶层的运算符(源)本身运行在进行 subscribe() 调用的线程上。

所以没有什么说它必须是一个新线程。

【讨论】:

除非指定 - 这里是通过将 subscribeOn 放在链中来指定的。

以上是关于Spring Webflux:如何使用不同的线程进行请求和响应的主要内容,如果未能解决你的问题,请参考以下文章

Spring WebFlux创建了无阻塞线程池

Spring Boot webflux介绍

深入剖析 Spring WebFlux

如何在 Spring Webflux / WebClient 中设置事件循环池大小?

如何在 spring-mvc 中将日志记录添加到 webflux 端点?

使用Webflux和Spring Cloud时如何用netty替换tomcat?