Spring Webflux:如何使用不同的线程进行请求和响应
Posted
技术标签:
【中文标题】Spring Webflux:如何使用不同的线程进行请求和响应【英文标题】:Spring Webflux: How to use different thread for request and response 【发布时间】:2019-12-01 06:37:24 【问题描述】:我正在使用 Spring Webflux,据我了解,通过使用它,用于接收请求的线程和用于响应的线程应该不同。但是,无论我使用 netty 还是 undertow,我最终都会使用相同的线程。
我的应用是一个简单的带有 mysql DB 的 crud 应用。我用的不是 r2dbc,而是 jdbc 加上 Executor 和 Scheduler。
如下日志所示,请求由线程[XNIO-1 I/O-6]处理,响应由同一线程给出。 通过这个,我假设线程被阻塞,直到数据库操作完成。我怎样才能解决这个问题?
这是日志
2019-07-23 17:49:10.051 INFO 132 --- [ main] org.xnio : XNIO version 3.3.8.Final
2019-07-23 17:49:10.059 INFO 132 --- [ main] org.xnio.nio : XNIO NIO Implementation Version 3.3.8.Final
2019-07-23 17:49:10.114 INFO 132 --- [ main] o.s.b.w.e.undertow.UndertowWebServer : Undertow started on port(s) 8080 (http)
2019-07-23 17:49:10.116 INFO 132 --- [ main] c.n.webflux.demo.WebfluxFunctionalApp : Started WebfluxFunctionalApp in 1.262 seconds (JVM running for 2.668)
2019-07-23 17:49:10.302 DEBUG 132 --- [ XNIO-1 I/O-6] o.s.w.s.adapter.HttpWebHandlerAdapter : [4c85975] HTTP GET "/api/findall"
2019-07-23 17:49:10.322 DEBUG 132 --- [ XNIO-1 I/O-6] s.w.r.r.m.a.RequestMappingHandlerMapping : [4c85975] Mapped to public reactor.core.publisher.Mono<java.util.List<com.webflux.demo.model.TypeStatus>> com.webflux.demo.controller.MonitoringController.findAll()
2019-07-23 17:49:10.337 DEBUG 132 --- [ XNIO-1 I/O-6] o.s.w.r.r.m.a.ResponseBodyResultHandler : Using 'application/json;charset=UTF-8' given [*/*] and supported [application/json;charset=UTF-8, application/*+json;charset=UTF-8, text/event-stream]
2019-07-23 17:49:10.338 DEBUG 132 --- [ XNIO-1 I/O-6] o.s.w.r.r.m.a.ResponseBodyResultHandler : [4c85975] 0..1 [java.util.List<com.webflux.demo.model.TypeStatus>]
2019-07-23 17:49:10.347 INFO 132 --- [pool-1-thread-1] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Starting...
2019-07-23 17:49:10.785 INFO 132 --- [pool-1-thread-1] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Start completed.
2019-07-23 17:49:10.838 DEBUG 132 --- [pool-1-thread-1] org.springframework.web.HttpLogging : [4c85975] Encoding [[com.webflux.demo.model.TypeStatus@7b4509cb, com.webflux.demo.model.TypeStatus@22676ebe, (truncated)...]
2019-07-23 17:49:10.949 DEBUG 132 --- [ XNIO-1 I/O-6] o.s.w.s.adapter.HttpWebHandlerAdapter : [4c85975] Completed 200 OK
我的道也是
@Repository
public class TypeStatusJdbcTemplate
private JdbcTemplate jdbcTemplate;
public TypeStatusJdbcTemplate(JdbcTemplate jdbcTemplate)
this.jdbcTemplate = jdbcTemplate;
private final static String SQL_FIND_ALL = "select * from `monitoring`.`type_status` limit 3";
public List<TypeStatus> findAll()
return jdbcTemplate.query(SQL_FIND_ALL,
new TypeStatusMapper());
服务是
@Service
public class MonitoringService
private final Scheduler scheduler;
private TypeStatusJdbcTemplate repository;
public MonitoringService(Scheduler scheduler, TypeStatusJdbcTemplate repository)
this.scheduler = scheduler;
this.repository = repository;
public Mono<List<TypeStatus>> findAll()
return Mono.fromCallable(repository::findAll).subscribeOn(scheduler);
控制器是
@RestController
@RequestMapping("/api")
public class MonitoringController
private final MonitoringService monitoringService;
private static final Logger logger = LoggerFactory.getLogger(MonitoringController.class);
public MonitoringController(MonitoringService monitoringService)
this.monitoringService = monitoringService;
@GetMapping(value="/findall")
public Mono<List<TypeStatus>> findAll()
return monitoringService.findAll();
主文件(显示调度器)
@SpringBootApplication
public class WebfluxFunctionalApp
public static void main(String[] args)
SpringApplication.run(WebfluxFunctionalApp.class, args);
@PostConstruct
public void init()
// Setting Spring Boot SetTimeZone
TimeZone.setDefault(TimeZone.getTimeZone("UTC"));
@Bean
public Scheduler jdbcScheduler()
return Schedulers.fromExecutor(Executors.newFixedThreadPool(30));
【问题讨论】:
自动连接Scheduler
的确切类型是什么?
无论如何,通过使用subscribeOn
,所有阻塞的数据库工作都被委托给另一个线程,这就是XNIO-1 I/O-6
不会被阻塞的原因。除非您不自动装配 netty 调度程序;)
我添加了显示调度程序的代码。我不太明白你说的第二条评论是什么意思。我希望 XNIO-1 I/O-6 不会被阻塞。但是,我认为它被阻塞了,因为它也用于处理请求和响应,而不是另一个线程承担这项工作。
也许你对“应该”的理解太严格了……试试更多的负载! ...而且:我看到(单声道)从“池-”线程中记录!
尝试在Mono.fromCallable(()->)
块中放置类似System.out.println("callable thread: "+Thread.currentThread().getName());
的内容。您将看到类似callable thread: pool-1-thread-1
的内容。它表明请求由 XNIO-1 线程处理,然后将控制权从线程池传递给线程,当响应准备好时,将其传递回 NIO-1。我认为这几乎可以回答您想要的问题 - XNIO-1 没有被阻塞,因为所有阻塞工作都在另一个线程上执行,而 XNIO-1 仅用于处理请求和响应。
【参考方案1】:
线程执行并不总是必须是不同的线程。取自 Reactive 文档:
Reactive Schedulers
获得 Flux 或 Mono 并不一定意味着它将在专用线程中运行。相反,大多数运算符继续在前一个运算符执行的线程中工作。除非指定,否则最顶层的运算符(源)本身运行在进行 subscribe() 调用的线程上。
所以没有什么说它必须是一个新线程。
【讨论】:
除非指定 - 这里是通过将subscribeOn
放在链中来指定的。以上是关于Spring Webflux:如何使用不同的线程进行请求和响应的主要内容,如果未能解决你的问题,请参考以下文章
如何在 Spring Webflux / WebClient 中设置事件循环池大小?