从 Mono 的列表中创建 Flux 的正确方法

Posted

技术标签:

【中文标题】从 Mono 的列表中创建 Flux 的正确方法【英文标题】:Proper way to create a Flux from a list of Mono's 【发布时间】:2019-01-21 10:47:00 【问题描述】:

假设我有一个使用自定义对象列表的 API 操作。对于这些对象中的每一个,它都会调用一个创建 Mono 的服务方法。如何以惯用且非阻塞的方式从这些 Mono 对象创建 Flux?

我现在想出的是这个。我更改了方法名称以更好地反映其预期目的。

fun myApiMethod(@RequestBody customObjs: List<CustomObject>): Flux<CustomObject> 

    return Flux.create  sink ->
        customObjs.forEach 

            service.persistAndReturnMonoOfCustomObject(it).map 
                sink.next(it)
            
        
        sink.complete()
    

此外,我是否需要订阅通量才能真正让它返回一些东西?

【问题讨论】:

@artem-bilan 给出的提示成功了。 这段代码是不是完全错误?大概 persistAndReturnMonoOfCustomObject 不会阻塞,因此所有 foreach 块都会立即执行,然后调用 sink.complete()。所以后面所有的 sink.next(it) (只有在persistAndReturnMonoOfCustomObject返回的mono完成时才执行)没有效果? 【参考方案1】:

我相信你可以改用concat()

/**
 * Concatenate all sources provided as a vararg, forwarding elements emitted by the
 * sources downstream.
 * <p>
 * Concatenation is achieved by sequentially subscribing to the first source then
 * waiting for it to complete before subscribing to the next, and so on until the
 * last source completes. Any error interrupts the sequence immediately and is
 * forwarded downstream.
 * <p>
 * <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.3.RELEASE/src/docs/marble/concat.png" >
 * <p>
 * @param sources The @link Publisher of @link Publisher to concat
 * @param <T> The type of values in both source and output sequences
 *
 * @return a new @link Flux concatenating all source sequences
 */
@SafeVarargs
public static <T> Flux<T> concat(Publisher<? extends T>... sources) 

merge():

/**
 * Merge data from @link Publisher sequences contained in an array / vararg
 * into an interleaved merged sequence. Unlike @link #concat(Publisher) concat,
 * sources are subscribed to eagerly.
 * <p>
 * <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.3.RELEASE/src/docs/marble/merge.png" >
 * <p>
 * Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with
 * an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source
 * in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to
 * another source.
 *
 * @param sources the array of @link Publisher sources to merge
 * @param <I> The source type of the data sequence
 *
 * @return a merged @link Flux
 */
@SafeVarargs
public static <I> Flux<I> merge(Publisher<? extends I>... sources) 

【讨论】:

以上是关于从 Mono 的列表中创建 Flux 的正确方法的主要内容,如果未能解决你的问题,请参考以下文章

合并两个 Mono 并获得一个 Flux。然后从那个 Flux 中提取一个 Mono

无法从 Flux / Mono 读取值

使用 python 从时间列表中创建平均值

Spring Webflux(Mono/Flux) 与 AOP 在拦截时触发 REST 调用并使用 Mono/Flux

单线程 Flux 中的 Mono

如何结合 Mono 和 Flux 作为参数来创建新的 Mono?