从 WebClient 流式传输到 Flux。阻塞超时抛出异常
Posted
技术标签:
【中文标题】从 WebClient 流式传输到 Flux。阻塞超时抛出异常【英文标题】:Stream from WebClient into Flux. Blocking timeout throws exception 【发布时间】:2020-10-01 17:27:55 【问题描述】:我在 Spring Boot 应用程序中使用 WebClient 调用流式 API。
我想检索元素,直到我收到 10 个元素,或者 10 秒过去了。我希望请求被阻止,直到其中任何一个先发生。
WebClient client = WebClient.builder().baseUrl(URL).build();
List<Item> items = client
.get()
.retrieve()
.bodyToFlux(Item.class)
.limitRequest(10)
.collectList()
.block(Duration.ofSeconds(10));
如果在超时之前检索到 10 个项目,调用会很好地返回,并且我有一个包含 10 个项目的填充列表。
但是,如果先超时,则会引发以下异常,并且不会返回任何项目。
java.lang.IllegalStateException: Timeout on blocking read for 10000 MILLISECONDS
如何读取最长 x 秒的流,然后使用 WebClient 返回检索到的项目?
【问题讨论】:
【参考方案1】:我想检索元素,直到我收到 10 个元素,或者 10 秒过去了。
听起来像 bufferTimeout()
正是你所追求的。
将传入的值收集到多个 List 缓冲区中,每次缓冲区达到最大大小或 maxTime Duration 过去时,返回的 Flux 将发出这些缓冲区。
您只需要这些缓冲区之一。在响应式上下文中,您只需在生成的通量上调用 next()
- 因为您只想阻止,您可以调用 blockFirst()
。
类似:
List<Item> items = client
.get()
.retrieve()
.bodyToFlux(Item.class)
.bufferTimeout(10, Duration.ofSeconds(10)) //first parameter is max number of elements, second is timeout
.blockFirst();
【讨论】:
.next().block()
可以替换为blockFirst()
太棒了。 bufferTimeout()
确实是我一直在寻找的,再加上 blockFirst()
。以上是关于从 WebClient 流式传输到 Flux。阻塞超时抛出异常的主要内容,如果未能解决你的问题,请参考以下文章
Webclient 与简单 Flux.just 的 Flux 行为不同
我可以使用从 Spring5 的 WebClient 返回的 Flux 的 block() 方法吗?