单线程 Flux 中的 Mono

Posted

技术标签:

【中文标题】单线程 Flux 中的 Mono【英文标题】:Mono in Flux by single thread 【发布时间】:2018-07-15 06:01:57 【问题描述】:

我有来自 db 的 Flux<Foo>(例如 5 个元素)。 我需要从每个 Foo 中获取一些信息,将其全部设置为 Mono<MyRequest>,发送到另一个休息资源,获取 Mono<MyResponse> 并在每个 Foo 中使用其中的所有信息。

我在Flux.flatMap() 中使用了很多Mono.zipWith()Mono.zipWhen()但是创建 MyRequest 并发送到资源发生了 5 次,由 5 个线程发生。

Flux<Foo> flux = dao.getAll();
Flux<Foo> fluxAfterProcessing = flux.flatMap(foo -> monoFilters.map(...));
Mono<Tuple2<MyRequest, MyResponse>> mono = 
                      monoFilters.flatMap(filter -> monoRequest.map(...))
                                 .zipWhen(request -> api.send(request))
                                 .flatMap(tuple -> monoResponseFilters.map(...));
return fluxAfterProcessing.flatMap(foo -> 
                       monoResponseFilters.zipWith(mono).map(...))

如何在 Flux 中仅通过 1 个线程处理我的 Mono 函数一次?

【问题讨论】:

通过应用flatMap,您实际上确实得到了至少与初始Flux 一样多的发射。我假设你真正想要使用的是collectList(),不是吗? 【参考方案1】:

让我们假设这个任务是这样的:

从数据库中获取一些值 当所有值都到达时,将它们包装在请求中并发送出去 带有响应的压缩结果

然后这导致我们是这样的:

Flux<Foo> foos = dao.getAll();
Mono<List<Foo>> everything = foos.collectList();

Mono<MyRequest> request = everything
    // collect the data into another Mono, then into request
    .map(list -> list.stream().map(Foo::getData).collect(toList()))
    .map(data -> new MyRequest(data));

return request.zipWhen(request -> api.send(request));

或者,如果您映射初始 foos,您可以更轻松地收集构建请求:

Flux<Data> data = dao.getAll().map(Foo::getData);
Mono<MyRequest> request = data.collectList().map(MyRequest::new); 

【讨论】:

但我需要从 MyResponse 中返回 Flux 的更改 @MaksymShamanovskyi 为什么?如果你真的需要发射几个脉冲,那么这就是原始代码所做的,所以没有问题。如果您确实只需要发射一次,那么这就是Mono&lt;T&gt; 的用例。如果你想隐藏你做了多少脉冲,为什么不返回一个Publisher?无论如何,还有一个Mono::flux

以上是关于单线程 Flux 中的 Mono的主要内容,如果未能解决你的问题,请参考以下文章

如何将通量元素转换为键映射单声道

c#中Timer是单线程还是多线程

了解 Flux Facebook 试图解决的问题

单声道下的堆栈大小

单核和多核,单进程和多进程,单线程与多线程

单线程应用程序中的死锁