如何从 WebFlux 中的 Mono<List<T>> 中提取内容以将其传递到调用链中

Posted

技术标签:

【中文标题】如何从 WebFlux 中的 Mono<List<T>> 中提取内容以将其传递到调用链中【英文标题】:How to extract content from Mono<List<T>> in WebFlux to pass it down the call chain 【发布时间】:2022-01-06 20:45:15 【问题描述】:

我希望能够从Mono&lt;List&lt;Payload&gt;&gt; 中提取List&lt;Payload&gt; 以将其传递给下游服务进行处理(或者可能从read(RequestParams params) 方法返回,而不是返回void):

@PostMapping("/subset")
    public void read(@RequestBody RequestParams params)
        Mono<List<Payload>> result =  reader.read(params.getDate(), params.getAssetClasses(), params.getFirmAccounts(), params.getUserId(), params.getPassword());
        
        ....
    

其中reader.read(...) 是自动装配的 Spring 服务上的一个方法,它利用 webClient 从外部 Web 服务 API 获取数据:

public Mono<List<Payload>> read(String date, String assetClasses, String firmAccounts, String id, String password) 
        Flux<Payload> nodes = client
                .get()
                .uri(uriBuilder -> uriBuilder
                    .path("/api/subset")
                    .queryParam("payloads", true)
                    .queryParam("date", date)
                    .queryParam("assetClasses", assetClasses)
                    .queryParam("firmAccounts", firmAccounts)
                    .build())
                .headers(header -> header.setBasicAuth("abc123", "XXXXXXX"))
        .retrieve()
        .onStatus(HttpStatus::is4xxClientError, response -> 
            System.out.println("4xx error");
            return Mono.error(new RuntimeException("4xx"));
        )
        .onStatus(HttpStatus::is5xxServerError, response -> 
            System.out.println("5xx error");
            return Mono.error(new RuntimeException("5xx"));
                )
        .bodyToFlux(Payload.class);
        
        Mono<List<Payload>> records = nodes
                .collectList();
        
        return records;
    

在 WebFlux 中不允许阻塞 result.block() 并引发异常:

new IllegalStateException("block()/blockFirst()/blockLast() are blocking, which is not supported in thread ..." ;

在 WebFlux 中提取 Mono 内容的正确方法是什么? 是某种subscribe()吗?语法是什么?

提前谢谢你。

【问题讨论】:

【参考方案1】:

虽然以下确实会在日志中返回 Mono observable 的值:

@PostMapping("/subset")
@ResponseBody
public  Mono<ResponseEntity<List<Payload>>> read1(@RequestBody RequestParams params)
        Mono<List<Payload>> result =  reader.read(params.getDate(), params.getAssetClasses(), params.getFirmAccounts(), params.getUserId(), params.getPassword());
                
        return result
                 .map(e -> new ResponseEntity<List<PayloadByStandardBasis>>(e, HttpStatus.OK));
        
    

我所寻求的理解是一种使用 WebFlux 组合调用链的正确方法,其中一个操作员/分支的响应(作为 webclient 调用的结果而具体化,产生一组记录,如上面)可以向下游传递给另一个操作员/腿,以促进将这些记录保存在数据库中的副作用,或类似的效果。

将这些步骤中的每一个建模为单独的 REST 端点可能是一个好主意,然后为组合操作设置另一个端点,该端点以正确的顺序在内部调用每个独立的端点,或者其他设计选择更受欢迎?

这最终是我所寻求的理解,所以如果有人想分享示例代码以及意见以更好地实施上述步骤,我愿意接受最全面的答案。

谢谢。

【讨论】:

【参考方案2】:

没有“正确的方法”,这就是重点。为了获得你需要阻塞的值,阻塞在 webflux 中是不好的,原因有很多(我现在不会讨论)。

您应该做的是将发布者一路返回给调用客户端。

许多人通常难以理解的一件事是 webflux 与生产者(MonoFlux)和订阅者一起工作。

你的整个服务也是一个生产者,调用客户端可以看作订阅者。

将其视为一条长链,从数据源开始,到显示数据的客户端结束。

一个简单的经验法则是,数据的最终消费者就是订阅者,其他人都是生产者。

因此,在您的情况下,您只需将 Mono&lt;List&lt;T&gt; 返回给调用客户端。

@PostMapping("/subset")
public Mono<List<Payload>> read(@RequestBody RequestParams params)
    Mono<List<Payload>> result =  reader.read(params.getDate(), params.getAssetClasses(), params.getFirmAccounts(), params.getUserId(), params.getPassword());
        
    return result;

【讨论】:

@Toerktumalare 由于不允许阻塞,如何从该链末尾的Mono 中获取List&lt;Payload&gt;&gt; 你不要,框架本身会订阅你的链并对调用客户端产生响应。如果您需要在使用Mono#mapMono#flatMap 的列表上工作,为什么感觉需要获得List&lt;T&gt;,无需访问Mono 上下文之外的列表 使用read() 调用结果的客户端需要一个列表,然后它可以传递到下游,因此我需要提取一个列表并将其作为类型返回。问题是如何在生成的 Mono 上使用 mapflatMap 完成它? @SimeonLeyzerzon 你的问题毫无意义,我希望你知道这一点。由于该函数用@PostMapping 注释,我假设您的客户正在发出POST 请求。你写了I need to extract a List and return it as a type你是什么意思你需要把它作为一个类型返回?您需要的是框架将您返回的任何内容序列化为客户端使用的格式。 我发布的你甚至没有尝试过的代码(否则你不会写出你奇怪的问题)将自动序列化一个 http 响应,其中包含一个包含对象Payload 的 json 格式列表。请在询问任何其他问题之前尝试代码。我会再重复一次,你根本没有办法在没有阻塞的情况下获得List&lt;Payload&gt;...没有,没有办法,零,永远不会。

以上是关于如何从 WebFlux 中的 Mono<List<T>> 中提取内容以将其传递到调用链中的主要内容,如果未能解决你的问题,请参考以下文章

如何从 WebFlux 客户端正确排出/释放响应体?

如何在 spring boot webflux 上从 mono<user> 获取用户名?

如何在不阻塞的情况下从 Spring Webflux 中的 Mono 对象中提取数据?

Mono<ServerResponse> 与 Mono<ResponseEntity<MyPojo>> 作为 Java Spring Webflux @Reques

如何在 Spring Webflux 中返回 Mono<Map<String, Flux<Integer>>> 响应?

Spring webflux:从请求中消耗单声道或通量