如何构建避免嵌套 Flux 块(Flux<Flux <T>>)的反应式架构?

Posted

技术标签:

【中文标题】如何构建避免嵌套 Flux 块(Flux<Flux <T>>)的反应式架构?【英文标题】:How to build reactive architecture that will avoid nested Flux blocks (Flux<Flux <T>>)? 【发布时间】:2019-10-16 04:43:41 【问题描述】:

我正在尝试构建一个应用程序 A(如适配器),它将:

1) 接收带有一些密钥(JSON 格式)的 POST 请求

2) 它应该以某种方式修改该密钥并创建对另一个系统 B 的 POST 请求。

3) 应用程序 A 应解析来自应用程序 B 的响应并修改该响应。

4) 之后,我的应用程序 A 应该响应初始 POST 请求。

@RestController
@RequestMapping("/A")
public class Controller 
    @ResponseStatus(HttpStatus.OK)
    @PostMapping(value = "B", consumes = APPLICATION_JSON_VALUE)
    // to return nested Flux is a bad idea here
    private Flux<Flux<Map<String, ResultClass>>> testUpdAcc(@RequestBody Flux<Map<String, SomeClass>> keys) 
        return someMethod(keys);
    

    // the problem comes here when I will get Flux<Flux<T>> in the return
    public Flux<Flux<Map<String, ResultClass>>> someMethod(Flux<Map<String, SomeClass>> keysFlux) 
        return keysFlux.map(keysMap -> 
                                // do something with keys and create URL
                                // also will batch keys here
                                <...>

                                // for each batch of keys:
                                WebClient.create(hostAndPort)
                                .method(HttpMethod.POST)
                                .uri(url)
                                .body(BodyInserters.fromObject(body))
                                .header(HttpHeaders.CONTENT_TYPE, "application/x-www-form-urlencoded")
                                .accept(MediaType.APPLICATION_JSON)
                                .retrieve()
                                .bodyToMono(schema) // response will be parsed into some schema here
                                .retryWhen (// will make a retry mechanism here)

                                // ===== will join all Mono batches into single Flux
                                Flux.concat(...);
                                
                             );
    


当然,这可以通过不将 keysFlux 读取为 Flux 并将其读取为 Map 来解决。但这应该让一切变得不那么被动,不是吗? :)

    @ResponseStatus(HttpStatus.OK)
    @PostMapping(value = "B", consumes = APPLICATION_JSON_VALUE)
    // to return nested Flux is a bad idea here
    private Flux<Map<String, ResultClass>> testUpdAcc(@RequestBody Map<String, SomeClass> keys) 
        return someMethod(keys);
    

我也曾尝试在返回请求前的最后一刻使用 block()/blockFirst(),但出现错误:

block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor...

感谢您的想法!

【问题讨论】:

直觉 id 说它应该是 Flux> someMethod(Flux>. 或定义 MyFlux extends Flux > 这个 Flux 到底在做什么? 【参考方案1】:

尝试像这样压缩所有助焊剂

Flux.zip(flux1,flux2)

它将创建 Tuple2 以便您可以做 flatMap

谢谢, 维马利什

【讨论】:

【参考方案2】:

忘记我的问题 - 我们可以轻松地使用“flatMap”而不是“map”。 这将解决 Flux 内部的 Flux 问题。

【讨论】:

一直在挠头。感谢您的提示!

以上是关于如何构建避免嵌套 Flux 块(Flux<Flux <T>>)的反应式架构?的主要内容,如果未能解决你的问题,请参考以下文章

如何通过 Flux 使用不同的调度程序运行两个任务

React Flux - 在调度中调度 - 如何避免?

如何在 Flux<String> 上执行 flatMap() 时获取索引号

错字链接的 FLUX 配置

如何使用 Flux 处理有状态的 DOM 元素

ReactJS 嵌套列表 (flux)