使用 Reactive 并行处理 List<Mono<Object>>
Posted
技术标签:
【中文标题】使用 Reactive 并行处理 List<Mono<Object>>【英文标题】:Parallel processing of List<Mono<Object>> using Reactive 【发布时间】:2019-10-27 11:03:53 【问题描述】:我有一个方法可以遍历购物车中的商品并使用 placeOrder 为相同商品下订单。 一旦为购物车中的所有商品调用 placeOrder,我想合并并发送一个单一的 Mono 对象,总结哪些订单通过了哪些订单没有
此代码有效,但未使用 placeOrder 的并行执行。
List<Mono<OrderResponse>> orderResponse = new ArrayList<Mono<OrderResponse>>();
OrderCombinedResponse combinedResponse = new OrderCombinedResponse();
//placeIndividualOrder returns Mono<OrderResponse>
session.getCartItems().forEach(cartItem ->
orderResponse.add(placeIndividualOrder(cartItem)));
return Flux.concat(orderResponse).collectList().map(responseList ->
responseList.forEach(response ->
//Do transformation to separate out failed and successful order
);
//Return Mono<OrderCombinedResponse> object
return combinedResponse;
);
我正在尝试使用下面的代码来并行处理购物车中的订单,但它不返回任何响应而只是退出
//Return Mono<OrderCombinedResponse> object
return Flux.fromIterable(session.getCartItems()).parallel()
//Call method to place order. This method return Mono<OrderResponse>
.map(cartItem -> placeIndividualOrder(cartItem))
.runOn(Schedulers.elastic())
//
.map(r ->
r.subscribe(response ->
//Do transformation to separate out failed and successful order
);
return combinedResponse;
);
【问题讨论】:
【参考方案1】:由于方法placeIndivisualOrder()
返回Mono
,你需要用.flatMap()
调用它。 .runOn()
应该高于对 placeIndivisualOrder()
的调用。如果它继续,就像在上面的代码中一样,你只在调度程序上运行后续的.map()
。最后,不要像你那样在.map()
内部调用subscribe()
,而应该在.flatMap()
之后调用.subscribe()
:
return Flux.fromIterable(session.getCartItems()).parallel()
.runOn(Schedulers.elastic())
//Call method to place order. This method return Mono<OrderResponse>
.flatMap(cartItem -> placeIndividualOrder(cartItem))
.sibscribe(response ->
// do something with response
,
e ->
// catch and report error
)
【讨论】:
以上是关于使用 Reactive 并行处理 List<Mono<Object>>的主要内容,如果未能解决你的问题,请参考以下文章