使用 Reactive 并行处理 List<Mono<Object>>

Posted

技术标签:

【中文标题】使用 Reactive 并行处理 List<Mono<Object>>【英文标题】:Parallel processing of List<Mono<Object>> using Reactive 【发布时间】:2019-10-27 11:03:53 【问题描述】:

我有一个方法可以遍历购物车中的商品并使用 placeOrder 为相同商品下订单。 一旦为购物车中的所有商品调用 placeOrder,我想合并并发送一个单一的 Mono 对象,总结哪些订单通过了哪些订单没有

此代码有效,但未使用 placeOrder 的并行执行。

List<Mono<OrderResponse>> orderResponse = new ArrayList<Mono<OrderResponse>>();
        OrderCombinedResponse combinedResponse = new OrderCombinedResponse();
//placeIndividualOrder returns Mono<OrderResponse>
        session.getCartItems().forEach(cartItem ->
          orderResponse.add(placeIndividualOrder(cartItem)));

return Flux.concat(orderResponse).collectList().map(responseList -> 
            responseList.forEach(response -> 
//Do transformation to separate out failed and successful order

            );
//Return Mono<OrderCombinedResponse> object
            return combinedResponse;
        );

我正在尝试使用下面的代码来并行处理购物车中的订单,但它不返回任何响应而只是退出

//Return Mono<OrderCombinedResponse> object 
return Flux.fromIterable(session.getCartItems()).parallel()
//Call method to place order. This method return Mono<OrderResponse>
.map(cartItem -> placeIndividualOrder(cartItem))
.runOn(Schedulers.elastic())
//
.map(r -> 
                    r.subscribe(response -> 
                        //Do transformation to separate out failed and successful order

                    );
                    return combinedResponse;
                );

【问题讨论】:

【参考方案1】:

由于方法placeIndivisualOrder()返回Mono,你需要用.flatMap()调用它。 .runOn() 应该高于对 placeIndivisualOrder() 的调用。如果它继续,就像在上面的代码中一样,你只在调度程序上运行后续的.map()。最后,不要像你那样在.map() 内部调用subscribe(),而应该在.flatMap() 之后调用.subscribe()

return Flux.fromIterable(session.getCartItems()).parallel()
    .runOn(Schedulers.elastic())
    //Call method to place order. This method return Mono<OrderResponse>
    .flatMap(cartItem -> placeIndividualOrder(cartItem))
    .sibscribe(response -> 
         // do something with response
    ,
    e -> 
         // catch and report error
    ) 

【讨论】:

以上是关于使用 Reactive 并行处理 List<Mono<Object>>的主要内容,如果未能解决你的问题,请参考以下文章

Stream多线程并行数据处理

SpringBoot WebFlux - 制作并行 WebClient 请求

System.Reactive - 一次处理未知数量的订阅

在 System.Reactive 的订阅中处理异常

Rxjava并行执行耗时操作使用zip和merge

Rxjava并行执行耗时操作使用zip和merge