如何在 Spring Webflux / Reactor Netty Web 应用程序中执行阻塞调用
Posted
技术标签:
【中文标题】如何在 Spring Webflux / Reactor Netty Web 应用程序中执行阻塞调用【英文标题】:How to execute blocking calls within a Spring Webflux / Reactor Netty web application 【发布时间】:2018-09-19 15:28:25 【问题描述】:在我的用例中,我有一个带有 Reactor Netty 的 Spring Webflux 微服务,我有以下依赖项:
org.springframework.boot.spring-boot-starter-webflux
(2.0.1.RELEASE)
org.springframework.boot.spring-boot-starter-data-mongodb-reactive
(2.0.1.RELEASE)
org.projectreactor.reactor-spring
(1.0.1.RELEASE)
对于一个非常具体的情况,我需要从我的 Mongo 数据库中检索一些信息,并将其处理为使用我的响应式WebClient
发送的查询参数。由于WebClient
和UriComponentsBuilder
接受发布者(单声道/通量),我使用#block()
调用来接收结果。
由于reactor-core
(版本0.7.6.RELEASE)已包含在最新的spring-boot-dependencies
(版本2.0.1.RELEASE)中,因此无法再使用:block()/blockFirst()/blockLast() are blocking, which is not supported in thread xxx
,参见->@987654321 @
我的代码sn-p:
public Mono<FooBar> getFooBar(Foo foo)
MultiValueMap<String, String> parameters = new LinkedMultiValueMap<>();
parameters.add("size", foo.getSize());
parameters.addAll("bars", barReactiveCrudRepository.findAllByIdentifierIn(foo.getBarIdentifiers()) // This obviously returns a Flux
.map(Bar::toString)
.collectList()
.block());
String url = UriComponentsBuilder.fromHttpUrl("https://base-url/")
.port(8081)
.path("/foo-bar")
.queryParams(parameters)
.build()
.toString();
return webClient.get()
.uri(url)
.retrieve()
.bodyToMono(FooBar.class);
这适用于 spring-boot
版本 2.0.0.RELEASE,但由于升级到版本 2.0.1.RELEASE 并因此从 reactor-core
升级到版本 0.7.6.RELEASE,它不再被允许。
我看到的唯一真正的解决方案是包含一个块(非反应性)存储库/mongo 客户端,但我不确定是否鼓励这样做。有什么建议?
【问题讨论】:
感谢您指出这一点。我们有非常相似的代码,在我们下次升级时会遇到一些问题。 【参考方案1】:WebClient
的请求 URL 不接受 Publisher
类型,但没有什么可以阻止您执行以下操作:
public Mono<FooBar> getFooBar(Foo foo)
Mono<List<String>> bars = barReactiveCrudRepository
.findAllByIdentifierIn(foo.getBarIdentifiers())
.map(Bar::toString)
.collectList();
Mono<FooBar> foobar = bars.flatMap(b ->
MultiValueMap<String, String> parameters = new LinkedMultiValueMap<>();
parameters.add("size", foo.getSize());
parameters.addAll("bars", b);
String url = UriComponentsBuilder.fromHttpUrl("https://base-url/")
.port(8081)
.path("/foo-bar")
.queryParams(parameters)
.build()
.toString();
return webClient.get()
.uri(url)
.retrieve()
.bodyToMono(FooBar.class);
);
return foobar;
如果有的话,这个新的反应堆核心检查使您免于在 WebFlux 处理程序中间使用这个阻塞调用使整个应用程序崩溃。
【讨论】:
这不是阻塞吗?如果是,可能一些 publishOn(Scheduler.elastic()) 或类似的东西会有意义,对吧? (不过,我不是这方面的专家) @Frischling 请指出您认为阻塞的部分。 整个 webClient.get()... 部分。它被打包在一个 Mono 中,是的,但最终数据被传输,并且可以任意慢;所以如果传输了很多数据,并且需要很多时间,它应该阻塞主线程,对吧? 调用 webClient.get() 不会对 I/O 做任何事情。订阅 Mono 将开始整个过程。该反应式管道没有阻塞 I/O,因此任何时候都不会阻塞任何线程。遥控器可能需要很长时间才能回答(我们无法解决整个世界的延迟问题),但仍然没有区别:没有阻塞操作,没有线程被阻塞。 谢谢布赖恩!这似乎做到了,我对反应式编程还比较陌生,这是我自己无法解决的一种情况。感谢您的帮助和很好的例子,希望对其他人也一样!以上是关于如何在 Spring Webflux / Reactor Netty Web 应用程序中执行阻塞调用的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 WebFlux 在 Spring Boot 2 中设置登录页面?
如何在 Spring Boot WebFlux 中使用 GET 请求注销
如何使用WebFlux在Spring Boot 2中设置登录页面?