单线程 Flux 中的 Mono
Posted
技术标签:
【中文标题】单线程 Flux 中的 Mono【英文标题】:Mono in Flux by single thread 【发布时间】:2018-07-15 06:01:57 【问题描述】:我有来自 db 的 Flux<Foo>
(例如 5 个元素)。
我需要从每个 Foo 中获取一些信息,将其全部设置为 Mono<MyRequest>
,发送到另一个休息资源,获取 Mono<MyResponse>
并在每个 Foo 中使用其中的所有信息。
我在Flux.flatMap()
中使用了很多Mono.zipWith()
和Mono.zipWhen()
,但是创建 MyRequest 并发送到资源发生了 5 次,由 5 个线程发生。
Flux<Foo> flux = dao.getAll();
Flux<Foo> fluxAfterProcessing = flux.flatMap(foo -> monoFilters.map(...));
Mono<Tuple2<MyRequest, MyResponse>> mono =
monoFilters.flatMap(filter -> monoRequest.map(...))
.zipWhen(request -> api.send(request))
.flatMap(tuple -> monoResponseFilters.map(...));
return fluxAfterProcessing.flatMap(foo ->
monoResponseFilters.zipWith(mono).map(...))
如何在 Flux 中仅通过 1 个线程处理我的 Mono 函数一次?
【问题讨论】:
通过应用flatMap
,您实际上确实得到了至少与初始Flux
一样多的发射。我假设你真正想要使用的是collectList(),不是吗?
【参考方案1】:
让我们假设这个任务是这样的:
从数据库中获取一些值 当所有值都到达时,将它们包装在请求中并发送出去 带有响应的压缩结果然后这导致我们是这样的:
Flux<Foo> foos = dao.getAll();
Mono<List<Foo>> everything = foos.collectList();
Mono<MyRequest> request = everything
// collect the data into another Mono, then into request
.map(list -> list.stream().map(Foo::getData).collect(toList()))
.map(data -> new MyRequest(data));
return request.zipWhen(request -> api.send(request));
或者,如果您映射初始 foos
,您可以更轻松地收集构建请求:
Flux<Data> data = dao.getAll().map(Foo::getData);
Mono<MyRequest> request = data.collectList().map(MyRequest::new);
【讨论】:
但我需要从 MyResponse 中返回 FluxMono<T>
的用例。如果你想隐藏你做了多少脉冲,为什么不返回一个Publisher
?无论如何,还有一个Mono::flux
。以上是关于单线程 Flux 中的 Mono的主要内容,如果未能解决你的问题,请参考以下文章