如何从 WebFlux 中的 Mono<List<T>> 中提取内容以将其传递到调用链中
Posted
技术标签:
【中文标题】如何从 WebFlux 中的 Mono<List<T>> 中提取内容以将其传递到调用链中【英文标题】:How to extract content from Mono<List<T>> in WebFlux to pass it down the call chain 【发布时间】:2022-01-06 20:45:15 【问题描述】:我希望能够从Mono<List<Payload>>
中提取List<Payload>
以将其传递给下游服务进行处理(或者可能从read(RequestParams params)
方法返回,而不是返回void
):
@PostMapping("/subset")
public void read(@RequestBody RequestParams params)
Mono<List<Payload>> result = reader.read(params.getDate(), params.getAssetClasses(), params.getFirmAccounts(), params.getUserId(), params.getPassword());
....
其中reader.read(...)
是自动装配的 Spring 服务上的一个方法,它利用 webClient 从外部 Web 服务 API 获取数据:
public Mono<List<Payload>> read(String date, String assetClasses, String firmAccounts, String id, String password)
Flux<Payload> nodes = client
.get()
.uri(uriBuilder -> uriBuilder
.path("/api/subset")
.queryParam("payloads", true)
.queryParam("date", date)
.queryParam("assetClasses", assetClasses)
.queryParam("firmAccounts", firmAccounts)
.build())
.headers(header -> header.setBasicAuth("abc123", "XXXXXXX"))
.retrieve()
.onStatus(HttpStatus::is4xxClientError, response ->
System.out.println("4xx error");
return Mono.error(new RuntimeException("4xx"));
)
.onStatus(HttpStatus::is5xxServerError, response ->
System.out.println("5xx error");
return Mono.error(new RuntimeException("5xx"));
)
.bodyToFlux(Payload.class);
Mono<List<Payload>> records = nodes
.collectList();
return records;
在 WebFlux 中不允许阻塞 result.block()
并引发异常:
new IllegalStateException("block()/blockFirst()/blockLast() are blocking, which is not supported in thread ..." ;
在 WebFlux 中提取 Mono 内容的正确方法是什么?
是某种subscribe()
吗?语法是什么?
提前谢谢你。
【问题讨论】:
【参考方案1】:虽然以下确实会在日志中返回 Mono observable 的值:
@PostMapping("/subset")
@ResponseBody
public Mono<ResponseEntity<List<Payload>>> read1(@RequestBody RequestParams params)
Mono<List<Payload>> result = reader.read(params.getDate(), params.getAssetClasses(), params.getFirmAccounts(), params.getUserId(), params.getPassword());
return result
.map(e -> new ResponseEntity<List<PayloadByStandardBasis>>(e, HttpStatus.OK));
我所寻求的理解是一种使用 WebFlux 组合调用链的正确方法,其中一个操作员/分支的响应(作为 webclient 调用的结果而具体化,产生一组记录,如上面)可以向下游传递给另一个操作员/腿,以促进将这些记录保存在数据库中的副作用,或类似的效果。
将这些步骤中的每一个建模为单独的 REST 端点可能是一个好主意,然后为组合操作设置另一个端点,该端点以正确的顺序在内部调用每个独立的端点,或者其他设计选择更受欢迎?
这最终是我所寻求的理解,所以如果有人想分享示例代码以及意见以更好地实施上述步骤,我愿意接受最全面的答案。
谢谢。
【讨论】:
【参考方案2】:没有“正确的方法”,这就是重点。为了获得你需要阻塞的值,阻塞在 webflux 中是不好的,原因有很多(我现在不会讨论)。
您应该做的是将发布者一路返回给调用客户端。
许多人通常难以理解的一件事是 webflux 与生产者(Mono
或 Flux
)和订阅者一起工作。
你的整个服务也是一个生产者,调用客户端可以看作订阅者。
将其视为一条长链,从数据源开始,到显示数据的客户端结束。
一个简单的经验法则是,数据的最终消费者就是订阅者,其他人都是生产者。
因此,在您的情况下,您只需将 Mono<List<T>
返回给调用客户端。
@PostMapping("/subset")
public Mono<List<Payload>> read(@RequestBody RequestParams params)
Mono<List<Payload>> result = reader.read(params.getDate(), params.getAssetClasses(), params.getFirmAccounts(), params.getUserId(), params.getPassword());
return result;
【讨论】:
@Toerktumalare 由于不允许阻塞,如何从该链末尾的Mono
中获取List<Payload>>
?
你不要,框架本身会订阅你的链并对调用客户端产生响应。如果您需要在使用Mono#map
或Mono#flatMap
的列表上工作,为什么感觉需要获得List<T>
,无需访问Mono
上下文之外的列表
使用read()
调用结果的客户端需要一个列表,然后它可以传递到下游,因此我需要提取一个列表并将其作为类型返回。问题是如何在生成的 Mono 上使用 map
或 flatMap
完成它?
@SimeonLeyzerzon 你的问题毫无意义,我希望你知道这一点。由于该函数用@PostMapping
注释,我假设您的客户正在发出POST
请求。你写了I need to extract a List and return it as a type
你是什么意思你需要把它作为一个类型返回?您需要的是框架将您返回的任何内容序列化为客户端使用的格式。
我发布的你甚至没有尝试过的代码(否则你不会写出你奇怪的问题)将自动序列化一个 http 响应,其中包含一个包含对象Payload
的 json 格式列表。请在询问任何其他问题之前尝试代码。我会再重复一次,你根本没有办法在没有阻塞的情况下获得List<Payload>
...没有,没有办法,零,永远不会。以上是关于如何从 WebFlux 中的 Mono<List<T>> 中提取内容以将其传递到调用链中的主要内容,如果未能解决你的问题,请参考以下文章
如何在 spring boot webflux 上从 mono<user> 获取用户名?
如何在不阻塞的情况下从 Spring Webflux 中的 Mono 对象中提取数据?
Mono<ServerResponse> 与 Mono<ResponseEntity<MyPojo>> 作为 Java Spring Webflux @Reques