如何将 Mono<List<String>> 转换为 Flux<String>

Posted

技术标签:

【中文标题】如何将 Mono<List<String>> 转换为 Flux<String>【英文标题】:How to convert Mono<List<String>> into Flux<String> 【发布时间】:2017-06-19 20:33:17 【问题描述】:

我正在将用 RxJava 1.x 编写的小项目转换为 Reactor 3.x。一切都很好,除了我找不到如何用合适的对应物替换flatMap(Observable::from)。我有Mono&lt;List&lt;String&gt;&gt;,我需要将其转换为Flux&lt;String&gt;

【问题讨论】:

【参考方案1】:

在 Reactor 3 中,from 运算符已被专门化为几个变体,具体取决于原始源(数组、可迭代等...)。

在你的情况下使用yourMono.flatMapMany(Flux::fromIterable)

【讨论】:

方法Mono.flatMap的返回值是Mono,不是Flux。 @SimonBaslé 为什么成员引用运算符在 kotlin 中不起作用? ` Mono.just(listOfElements).flatMapMany(Flux::fromIterable)` //这不起作用`我必须像下面这样写代码Mono.just(listOfElements).flatMapMany Flux.fromIterable(it) ` @rhozet 不知道,这适用于 Java AFAIK 所以...... kotlin 编译器专家的问题? 是的,它可以在 Java 中运行,这可能是编译器专家的问题 :) 我相信 youtrack.jetbrains.com/issue/KT-13003 是函数引用在 Kotlin 中不起作用的原因【参考方案2】:

谢谢西蒙,我实现了这样的东西:

List<Object> dbObjects = ListObjectsBD();
    List<Dao> daos = mapperObjToDao(dbObjects);
    Flux<Dao> daoFlux = Mono.just(daos).flatMapMany(Flux::fromIterable);

【讨论】:

【参考方案3】:

我认为Flux::mergeSequential 静态工厂可能更适合这里:

 Iterable<Mono<String>> monos = ...
 Flux<String> f = Flux.mergeSequential(monos);

这种合并(顺序)将保持给定源可迭代内部的顺序,并且还将从所有参与的源急切地订阅/请求(因此在计算单声道结果​​时预计会有更多的并行化)。

【讨论】:

请注意,Flux.mergeSequential 按顺序订阅每个源,而无需等待其间完成。如果您严格要求下一个 Mono 在前一个完成后发生(例如,您要发布到事件队列和订单问题),您应该使用 Flux.concat(monos)

以上是关于如何将 Mono<List<String>> 转换为 Flux<String>的主要内容,如果未能解决你的问题,请参考以下文章

使用 Reactive 并行处理 List<Mono<Object>>

何时使用 Mono<List<Object>> 以及何时使用 Flux<Object> 用于 RestController 方法

android mono:使用 List<T> 而不是 ArrayAdapter 来使用 Contains 方法

如何将返回 `void` 的同步方法包装到 Mono<Void> 中?

Reactor响应式编程(Mono)

如何从 Mono<ClientResponse> 获取正文?