java - 如何在Java响应式中为通量/单声道调用多个服务?
Posted
技术标签:
【中文标题】java - 如何在Java响应式中为通量/单声道调用多个服务?【英文标题】:How to invoke multiple services for a flux / mono in java reactive? 【发布时间】:2021-05-24 16:39:10 【问题描述】:我是响应式世界的新手,听起来可能是个新手,我有一系列大小为 20-30 的产品,对于每个产品,我需要从不同的微服务中获取以下内容:
-
平均评论数
总评论数
wishlistedCount
变种..
..
6 ..
我尝试过的..
1. doOnNext
Flux<Product> products = ...
products
.doOnNext(product -> updateReviewCount)
.doOnNext(product -> updateTotalCommentCount)
.doOnNext(product -> updateWishlistedCount)
.doOnNext(product -> updateVariants)
...
结果证明,每个产品的每次调用都会阻塞链..
e.g.
Total records(20) * No. of service calls(5) * Time per service calls(30 ms) = 3000ms
但时间会随着记录的数量而增长||服务调用次数。
2。地图 使用地图我更新并返回相同的参考,但结果是一样的。
3.收集所有作为列表并对下游服务执行聚合查询
Flux<Product> products = ...
products
.collectList() // Mono<List<Product>>
.doOnNext(productList -> updateReviewCountOfAllInList)
.doOnNext(productList -> updateFieldB_ForAllInList)
.doOnNext(productList -> updateFieldC_ForAllInList)
.doOnNext(productList -> updateFieldD_ForAllInList)
...
这确实提高了性能,尽管现在下游应用程序必须为查询返回更多数据,所以下游端增加的时间很少,但没关系。
现在有了这个,我能够实现如下时间...... 总记录(合并为列表,所以 1)* 服务调用次数(5)* 每次服务调用的时间(随着时间增加 50 毫秒)= 250 毫秒
但时间会随着服务调用次数的增加而增加。
现在我需要并行化这些服务调用并并行执行这些服务调用,并在同一产品实例上更新它们各自的字段(相同的引用)。 有些像下面
Flux<Product> products = ... // of 10 products
products
.collectList() // Mono<List<Product>>
.doAllInParallel(serviceCall1, serviceCall2, serviceCall3...)
. // get all updated products // flux size of 10
我想实现时间... 250/5 = 50ms
如何做到这一点? 我发现了不同的文章,但我不确定最好的方法是什么?有人可以帮助我吗?
【问题讨论】:
尝试使用flatMap
而不是doOnNext
flatMap 也会阻塞,不会并行调用
【参考方案1】:
它使用了
products // Flux<Product>
.collectList() // Mono<List<Product>>
.flatMap(list -> Mono.zip( this.call1(list) ,this.call2(list) ) ) // will return Tuple
.map(t -> t.getT1)
.flatMapIterable(i->i)
Mono<Product> call1(List<Product> productList)
// some logic
Mono<Product> call2(List<Product> productList)
// some logic
实际上 zip 和 flatmapiterable 也可以一步完成。这里只是为了演示。
【讨论】:
以上是关于java - 如何在Java响应式中为通量/单声道调用多个服务?的主要内容,如果未能解决你的问题,请参考以下文章
如何使用弹簧反应 webflux 中的单声道和助焊剂使用 DTO 制作新的单声道