合并两个 Mono 并获得一个 Flux。然后从那个 Flux 中提取一个 Mono

Posted

技术标签:

【中文标题】合并两个 Mono 并获得一个 Flux。然后从那个 Flux 中提取一个 Mono【英文标题】:Merging two Mono and getting a Flux. Then extracting a Mono from that Flux 【发布时间】:2021-07-31 07:14:16 【问题描述】:

我有两个 Mono<T>,我从两个不同的来源让我们说KAFKA

我的意图是将 这两个 Mono 合并为 Flux<T>1

然后使用Flux中的public final Mono<T> reduce(BiFunction<T,T,T> aggregator)方法创建一个最终的Mono(因为上面两个Mono的响应时间可能会有所不同)。 2

方法:

contactzipzipWith 等多种方法可用于Flux。我如何找到正确的使用方法(两次MonoFlux 转换,即1)。

这种REDUCE 的方法真的正确吗,或者还有什么其他方法可以即兴发挥吗(2)?谢谢。

【问题讨论】:

【参考方案1】:

如果您真的想使用Flux 来执行此操作,那么您可能希望使用merge(),类似于:

Flux.merge(mono1(), mono2()).reduce((obj1, obj2) -> foo(obj1, obj2));

...其中foo() 实现了问题中reduce 方法的功能,将发出的两个对象组合成一个值。你不会想使用concat(),除非你想一次订阅每个Mono,等待每个完成,而不是一起完成——并且Flux.zipXXX系列运算符将用于单独压缩流动在一起,所以你不会想要那个。

但是,我认为您在此处对两个值没有正确的方法 - 如果您想将两个 Mono 发布者放入 Flux,然后立即将它们减少回 Mono,那么它使用 Flux 根本没有多大意义,因为您必须等待两个发布者都完成后才能发出任何内容,然后您只会发出一个值。

相反,我建议使用this variant of Mono.zip(),它可以让您一次性完成所需的一切,例如:

Mono.zip(mono1(), mono2(), (obj1, obj2) -> foo(obj1, obj2));

【讨论】:

谢谢@Michael。我的要求是获得两个Mono<ValidationRequest>,它可能比这可能更多,让我们说输入ValidationRequest。我的想法是将所有这些组合起来形成一个Flux<ValidationRequest>,然后将Flux<ValidationRequest> 减少到Mono<ValidationResponse> @nihar 我已经用Flux 更新了答案,如果你想采用这种方法的话。 谢谢@Michael

以上是关于合并两个 Mono 并获得一个 Flux。然后从那个 Flux 中提取一个 Mono的主要内容,如果未能解决你的问题,请参考以下文章

无法从 Flux / Mono 读取值

Spring Webflux(Mono/Flux) 与 AOP 在拦截时触发 REST 调用并使用 Mono/Flux

Reactor响应式编程(Mono)

使用 SpringFlux 的 webclient 重复 Mono

如何结合 Mono 和 Flux 作为参数来创建新的 Mono?

我对响应式编程中Mono和Flux的理解