Spring Reactive Programming with Webflux - 多个操作作为非阻塞流

Posted

技术标签:

【中文标题】Spring Reactive Programming with Webflux - 多个操作作为非阻塞流【英文标题】:Spring Reactive Programming with Webflux - multiple operations as a non-blocking stream 【发布时间】:2020-03-06 13:54:03 【问题描述】:

我有以下代码:

public Flux<Offer> getAllFilteredOffers(Map<String, String> searchParams) 

    Flux<ProductProperties> productProperties = productPropertiesService.findProductPropertiesBySearchCriteria(searchParams);
    Flux<Product> products = productService.findProductsByPropertyId(productProperties);
    Flux<Product> productsByAvailability = productService.getAllProductsByAvailability(products, searchParams);
    Flux<Offer> offers = offerRepository.findByPropertiesIds(productsByAvailability);
    return offers;

这个方法:

productService.getAllProductsByAvailability(products, searchParams);

看起来像:

public Flux<Product> getAllProductsByAvailability(Flux<Product> products,
            Map<String, String> searchParams) 

如何将List&lt;Product&gt; 传递给getAllProductsByAvailability 以保持非阻塞操作? 我读过地图被阻塞,应该避免。 也许是这样的?

    Flux
                    .just(productPropertiesService.findProductPropertiesBySearchCriteria(searchParams))
                    .flatMap(productProperties -> productService.findProductsByPropertyId(productProperties))
                    .flatMap(products -> productService.getAllProductsByAvailability(Flux.create(products)?????????, searchParams))
???

我不是 Webflux 方面的专家,目前我正试图弄清楚如何处理以下问题:我有 Flux,但在第二步中,我需要从之前的 Flex 对象中提取一些数据 - 保持非阻塞流。

比你!

【问题讨论】:

【参考方案1】:

我不知道你从哪里读到map,但是如果你看一下官方文档Webflux map operator 并没有关于阻塞,它只是对每个项目使用同步功能。

使用此代码:

productPropertiesService.findProductPropertiesBySearchCriteria(searchParams)
                .flatMap(productProperties -> productService.findProductsByPropertyId(productProperties))
                .collectList()   (1)
                .flatMapMany(products -> productService.getAllProductsByAvailability(Flux.fromIterable(products), searchParams)) (2)

1) 收集所有元素到List并转换为Mono>

2) 从 List 创建 FLux 并将其作为参数提供,flatMapMany 将 Mono 转换为 Flux

【讨论】:

Yauhen Balykin - 整体上是非阻塞操作? 是的,操作符是非阻塞的,但是如果你害怕任何操作阻塞,你可以阅读官方文档。 感谢您的帮助!这正是我一直在寻找的

以上是关于Spring Reactive Programming with Webflux - 多个操作作为非阻塞流的主要内容,如果未能解决你的问题,请参考以下文章

[RxJS] Reactive Programming - Sharing network requests with shareReplay()

[RxJS] Reactive Programming - New requests from refresh clicks -- merge()

[RxJS] Reactive Programming - Using cached network data with RxJS -- withLatestFrom()

Spring boot加载REACTIVE程序过程

Spring Boot Reactive Streams

Spring 5 Reactive - WebExceptionHandler 没有被调用