合并两个 Mono 并获得一个 Flux。然后从那个 Flux 中提取一个 Mono
Posted
技术标签:
【中文标题】合并两个 Mono 并获得一个 Flux。然后从那个 Flux 中提取一个 Mono【英文标题】:Merging two Mono and getting a Flux. Then extracting a Mono from that Flux 【发布时间】:2021-07-31 07:14:16 【问题描述】:我有两个 Mono<T>
,我从两个不同的来源让我们说KAFKA
。
我的意图是将 这两个 Mono
合并为 Flux<T>
。 1
然后使用Flux
中的public final Mono<T> reduce(BiFunction<T,T,T> aggregator)
方法创建一个最终的Mono
(因为上面两个Mono
的响应时间可能会有所不同)。 2
方法:
contact
、zip
、zipWith
等多种方法可用于Flux
。我如何找到正确的使用方法(两次Mono
到Flux
转换,即1)。
这种REDUCE
的方法真的正确吗,或者还有什么其他方法可以即兴发挥吗(2)?谢谢。
【问题讨论】:
【参考方案1】:如果您真的想使用Flux
来执行此操作,那么您可能希望使用merge()
,类似于:
Flux.merge(mono1(), mono2()).reduce((obj1, obj2) -> foo(obj1, obj2));
...其中foo()
实现了问题中reduce
方法的功能,将发出的两个对象组合成一个值。你不会想使用concat()
,除非你想一次订阅每个Mono
,等待每个完成,而不是一起完成——并且Flux.zipXXX
系列运算符将用于单独压缩流动在一起,所以你不会想要那个。
但是,我认为您在此处对两个值没有正确的方法 - 如果您想将两个 Mono
发布者放入 Flux
,然后立即将它们减少回 Mono
,那么它使用 Flux
根本没有多大意义,因为您必须等待两个发布者都完成后才能发出任何内容,然后您只会发出一个值。
相反,我建议使用this variant of Mono.zip()
,它可以让您一次性完成所需的一切,例如:
Mono.zip(mono1(), mono2(), (obj1, obj2) -> foo(obj1, obj2));
【讨论】:
谢谢@Michael。我的要求是获得两个Mono<ValidationRequest>
,它可能比这可能更多,让我们说输入ValidationRequest
。我的想法是将所有这些组合起来形成一个Flux<ValidationRequest>
,然后将Flux<ValidationRequest>
减少到Mono<ValidationResponse>
@nihar 我已经用Flux
更新了答案,如果你想采用这种方法的话。
谢谢@Michael以上是关于合并两个 Mono 并获得一个 Flux。然后从那个 Flux 中提取一个 Mono的主要内容,如果未能解决你的问题,请参考以下文章
Spring Webflux(Mono/Flux) 与 AOP 在拦截时触发 REST 调用并使用 Mono/Flux
使用 SpringFlux 的 webclient 重复 Mono