从 Mono 的列表中创建 Flux 的正确方法
Posted
技术标签:
【中文标题】从 Mono 的列表中创建 Flux 的正确方法【英文标题】:Proper way to create a Flux from a list of Mono's 【发布时间】:2019-01-21 10:47:00 【问题描述】:假设我有一个使用自定义对象列表的 API 操作。对于这些对象中的每一个,它都会调用一个创建 Mono 的服务方法。如何以惯用且非阻塞的方式从这些 Mono 对象创建 Flux?
我现在想出的是这个。我更改了方法名称以更好地反映其预期目的。
fun myApiMethod(@RequestBody customObjs: List<CustomObject>): Flux<CustomObject>
return Flux.create sink ->
customObjs.forEach
service.persistAndReturnMonoOfCustomObject(it).map
sink.next(it)
sink.complete()
此外,我是否需要订阅通量才能真正让它返回一些东西?
【问题讨论】:
@artem-bilan 给出的提示成功了。 这段代码是不是完全错误?大概 persistAndReturnMonoOfCustomObject 不会阻塞,因此所有 foreach 块都会立即执行,然后调用 sink.complete()。所以后面所有的 sink.next(it) (只有在persistAndReturnMonoOfCustomObject返回的mono完成时才执行)没有效果? 【参考方案1】:我相信你可以改用concat()
:
/**
* Concatenate all sources provided as a vararg, forwarding elements emitted by the
* sources downstream.
* <p>
* Concatenation is achieved by sequentially subscribing to the first source then
* waiting for it to complete before subscribing to the next, and so on until the
* last source completes. Any error interrupts the sequence immediately and is
* forwarded downstream.
* <p>
* <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.3.RELEASE/src/docs/marble/concat.png" >
* <p>
* @param sources The @link Publisher of @link Publisher to concat
* @param <T> The type of values in both source and output sequences
*
* @return a new @link Flux concatenating all source sequences
*/
@SafeVarargs
public static <T> Flux<T> concat(Publisher<? extends T>... sources)
或merge()
:
/**
* Merge data from @link Publisher sequences contained in an array / vararg
* into an interleaved merged sequence. Unlike @link #concat(Publisher) concat,
* sources are subscribed to eagerly.
* <p>
* <img class="marble" src="https://raw.githubusercontent.com/reactor/reactor-core/v3.1.3.RELEASE/src/docs/marble/merge.png" >
* <p>
* Note that merge is tailored to work with asynchronous sources or finite sources. When dealing with
* an infinite source that doesn't already publish on a dedicated Scheduler, you must isolate that source
* in its own Scheduler, as merge would otherwise attempt to drain it before subscribing to
* another source.
*
* @param sources the array of @link Publisher sources to merge
* @param <I> The source type of the data sequence
*
* @return a merged @link Flux
*/
@SafeVarargs
public static <I> Flux<I> merge(Publisher<? extends I>... sources)
【讨论】:
以上是关于从 Mono 的列表中创建 Flux 的正确方法的主要内容,如果未能解决你的问题,请参考以下文章
合并两个 Mono 并获得一个 Flux。然后从那个 Flux 中提取一个 Mono