我可以使用从 Spring5 的 WebClient 返回的 Flux 的 block() 方法吗?

Posted

技术标签:

【中文标题】我可以使用从 Spring5 的 WebClient 返回的 Flux 的 block() 方法吗?【英文标题】:Can I use block() method of Flux returned from Spring5's WebClient? 【发布时间】:2018-07-09 02:56:34 【问题描述】:

我创建了 Spring Boot 2.0 演示应用程序,其中包含两个使用 WebClient 进行通信的应用程序。当我从 WebClient 的响应中使用 Flux 的 block() 方法时,他们经常停止通信,我很痛苦。由于某些原因,我想使用 List 而不是 Flux。

服务器端应用程序是这样的。它只是返回 Flux 对象。

@GetMapping
public Flux<Item> findAll() 
    return Flux.fromIterable(items);

而客户端(或 BFF 端)应用程序是这样的。我从服务器获取 Flux 并通过调用 block() 方法将其转换为 List。

@GetMapping
public List<Item> findBlock() 
    return webClient.get()
        .retrieve()
        .bodyToFlux(Item.class)
        .collectList()
        .block(Duration.ofSeconds(10L));

虽然一开始运行良好,但 findBlock() 多次访问后不会响应并超时。当我修改 findBlock() 方法以返回 Flux 删除 collectList() 和 block() 时,它运行良好。然后我假设 block() 方法会导致这个问题。 而且,当我修改 findAll() 方法以返回 List 时,没有任何变化。

整个示例应用的源代码在这里。https://github.com/cero-t/webclient-example

“resource”是服务器应用程序,“front”是客户端应用程序。运行这两个应用程序后,当我访问 localhost:8080 时,它运行良好,我可以随时重新加载,但是当我访问 localhost:8080/block 时,它似乎运行良好,但在多次重新加载后它不会响应。


顺便说一句,当我将“spring-boot-starter-web”依赖项添加到“前端”应用程序(而不是资源应用程序)的 pom.xml 中时,这意味着我使用的是 tomcat,这个问题永远不会发生。是不是 Netty 服务器的问题?

任何指导将不胜感激。

【问题讨论】:

【参考方案1】:

首先,我要指出的是,仅在从内存中获取 items 且不涉及 I/O 时才建议使用 Flux.fromIterable(items)。否则,您可能会使用阻塞 API 来获取它——这可能会破坏您的反应式应用程序。在这种情况下,这是一个内存列表,所以没问题。注意你也可以去Flux.just(item1, item2, item3)

使用以下是最有效的:

@GetMapping("/")
public Flux<Item> findFlux() 
  return webClient.get()
    .retrieve()
    .bodyToFlux(Item.class);

Item 实例将以非常有效的方式即时读取/写入、解码/编码。

另一方面,这不是首选方式:

@GetMapping("/block")
public List<Item> findBlock() 
  return webClient.get()
    .retrieve()
    .bodyToFlux(Item.class)
    .collectList()
    .block(Duration.ofSeconds(10L));

在这种情况下,您的前端应用程序使用collectList 在内存中缓冲整个项目列表,但同时也阻塞了少数可用的服务器线程之一。这可能会导致性能非常差,因为您的服务器可能会被阻止等待该数据并且无法同时为其他请求提供服务。

在这种特殊情况下,情况更糟,因为应用程序完全崩溃了。 查看控制台,我们可以看到以下内容:

WARN 3075 --- [ctor-http-nio-7] io.netty.util.concurrent.DefaultPromise  : An exception was thrown by reactor.ipc.netty.channel.PooledClientContextHandler$$Lambda$532/356589024.operationComplete()

reactor.core.Exceptions$BubblingException: java.lang.IllegalArgumentException: Channel [id: 0xab15f050, L:/127.0.0.1:59350 - R:localhost/127.0.0.1:8081] was not acquired from this ChannelPool
    at reactor.core.Exceptions.bubble(Exceptions.java:154) ~[reactor-core-3.1.3.RELEASE.jar:3.1.3.RELEASE]

这可能与应该在 0.7.4.RELEASE 中修复的 reactor-netty client connection pool issue 相关联。我不知道具体情况,但我怀疑整个连接池都被损坏了,因为 HTTP 响应没有从客户端连接中正确读取。

添加spring-boot-starter-web 确实使您的应用程序使用Tomcat,但它主要将您的Spring WebFlux 应用程序变成一个Spring MVC 应用程序(它现在支持一些响应式返回类型,但具有不同的运行时模型)。如果您希望使用 Tomcat 测试您的应用程序,您可以将 spring-boot-starter-tomcat 添加到您的 POM 中,这将使用 Tomcat 和 Spring WebFlux。

【讨论】:

非常感谢您的详细解释!!我理解得很好,现在我知道我应该将 Flux 目录返回给客户端,而不是阻塞线程。您对 reactor-netty 问题的引用也很有帮助。我了解情况。再次感谢!

以上是关于我可以使用从 Spring5 的 WebClient 返回的 Flux 的 block() 方法吗?的主要内容,如果未能解决你的问题,请参考以下文章

Spring 从0开始Spring5 新功能 - @Nullable 注解和函数式注册对象

众所期待,如约而至 | 尚硅谷Spring5视频教程发布!

如何使用 WebClient 执行同步请求?

你了解Spring从Spring3到Spring5的变迁吗?

Spring5源码解析1-从启动容器开始

Spring 5 AOP 默认改用 CGLIB 了?从现象到源码的深度分析