Spring Reactive Programming with Webflux - 多个操作作为非阻塞流
Posted
技术标签:
【中文标题】Spring Reactive Programming with Webflux - 多个操作作为非阻塞流【英文标题】:Spring Reactive Programming with Webflux - multiple operations as a non-blocking stream 【发布时间】:2020-03-06 13:54:03 【问题描述】:我有以下代码:
public Flux<Offer> getAllFilteredOffers(Map<String, String> searchParams)
Flux<ProductProperties> productProperties = productPropertiesService.findProductPropertiesBySearchCriteria(searchParams);
Flux<Product> products = productService.findProductsByPropertyId(productProperties);
Flux<Product> productsByAvailability = productService.getAllProductsByAvailability(products, searchParams);
Flux<Offer> offers = offerRepository.findByPropertiesIds(productsByAvailability);
return offers;
这个方法:
productService.getAllProductsByAvailability(products, searchParams);
看起来像:
public Flux<Product> getAllProductsByAvailability(Flux<Product> products,
Map<String, String> searchParams)
如何将List<Product>
传递给getAllProductsByAvailability
以保持非阻塞操作?
我读过地图被阻塞,应该避免。
也许是这样的?
Flux
.just(productPropertiesService.findProductPropertiesBySearchCriteria(searchParams))
.flatMap(productProperties -> productService.findProductsByPropertyId(productProperties))
.flatMap(products -> productService.getAllProductsByAvailability(Flux.create(products)?????????, searchParams))
???
我不是 Webflux 方面的专家,目前我正试图弄清楚如何处理以下问题:我有 Flux,但在第二步中,我需要从之前的 Flex 对象中提取一些数据 - 保持非阻塞流。
比你!
【问题讨论】:
【参考方案1】:我不知道你从哪里读到map
,但是如果你看一下官方文档Webflux map operator 并没有关于阻塞,它只是对每个项目使用同步功能。
使用此代码:
productPropertiesService.findProductPropertiesBySearchCriteria(searchParams)
.flatMap(productProperties -> productService.findProductsByPropertyId(productProperties))
.collectList() (1)
.flatMapMany(products -> productService.getAllProductsByAvailability(Flux.fromIterable(products), searchParams)) (2)
1) 收集所有元素到List并转换为Mono>
2) 从 List 创建 FLux 并将其作为参数提供,flatMapMany 将 Mono 转换为 Flux
【讨论】:
Yauhen Balykin - 整体上是非阻塞操作? 是的,操作符是非阻塞的,但是如果你害怕任何操作阻塞,你可以阅读官方文档。 感谢您的帮助!这正是我一直在寻找的以上是关于Spring Reactive Programming with Webflux - 多个操作作为非阻塞流的主要内容,如果未能解决你的问题,请参考以下文章
[RxJS] Reactive Programming - Sharing network requests with shareReplay()
[RxJS] Reactive Programming - New requests from refresh clicks -- merge()
[RxJS] Reactive Programming - Using cached network data with RxJS -- withLatestFrom()