如何构建避免嵌套 Flux 块(Flux<Flux <T>>)的反应式架构?
Posted
技术标签:
【中文标题】如何构建避免嵌套 Flux 块(Flux<Flux <T>>)的反应式架构?【英文标题】:How to build reactive architecture that will avoid nested Flux blocks (Flux<Flux <T>>)? 【发布时间】:2019-10-16 04:43:41 【问题描述】:我正在尝试构建一个应用程序 A(如适配器),它将:
1) 接收带有一些密钥(JSON 格式)的 POST 请求
2) 它应该以某种方式修改该密钥并创建对另一个系统 B 的 POST 请求。
3) 应用程序 A 应解析来自应用程序 B 的响应并修改该响应。
4) 之后,我的应用程序 A 应该响应初始 POST 请求。
@RestController
@RequestMapping("/A")
public class Controller
@ResponseStatus(HttpStatus.OK)
@PostMapping(value = "B", consumes = APPLICATION_JSON_VALUE)
// to return nested Flux is a bad idea here
private Flux<Flux<Map<String, ResultClass>>> testUpdAcc(@RequestBody Flux<Map<String, SomeClass>> keys)
return someMethod(keys);
// the problem comes here when I will get Flux<Flux<T>> in the return
public Flux<Flux<Map<String, ResultClass>>> someMethod(Flux<Map<String, SomeClass>> keysFlux)
return keysFlux.map(keysMap ->
// do something with keys and create URL
// also will batch keys here
<...>
// for each batch of keys:
WebClient.create(hostAndPort)
.method(HttpMethod.POST)
.uri(url)
.body(BodyInserters.fromObject(body))
.header(HttpHeaders.CONTENT_TYPE, "application/x-www-form-urlencoded")
.accept(MediaType.APPLICATION_JSON)
.retrieve()
.bodyToMono(schema) // response will be parsed into some schema here
.retryWhen (// will make a retry mechanism here)
// ===== will join all Mono batches into single Flux
Flux.concat(...);
);
当然,这可以通过不将 keysFlux 读取为 Flux 并将其读取为 Map 来解决。但这应该让一切变得不那么被动,不是吗? :)
@ResponseStatus(HttpStatus.OK)
@PostMapping(value = "B", consumes = APPLICATION_JSON_VALUE)
// to return nested Flux is a bad idea here
private Flux<Map<String, ResultClass>> testUpdAcc(@RequestBody Map<String, SomeClass> keys)
return someMethod(keys);
我也曾尝试在返回请求前的最后一刻使用 block()/blockFirst(),但出现错误:
block()/blockFirst()/blockLast() are blocking, which is not supported in thread reactor...
感谢您的想法!
【问题讨论】:
直觉 id 说它应该是 Flux 【参考方案1】:尝试像这样压缩所有助焊剂
Flux.zip(flux1,flux2)
它将创建 Tuple2 以便您可以做 flatMap
谢谢, 维马利什
【讨论】:
【参考方案2】:忘记我的问题 - 我们可以轻松地使用“flatMap”而不是“map”。 这将解决 Flux 内部的 Flux 问题。
【讨论】:
一直在挠头。感谢您的提示!以上是关于如何构建避免嵌套 Flux 块(Flux<Flux <T>>)的反应式架构?的主要内容,如果未能解决你的问题,请参考以下文章