java - 如何在Java响应式中为通量/单声道调用多个服务?

Posted

技术标签:

【中文标题】java - 如何在Java响应式中为通量/单声道调用多个服务?【英文标题】:How to invoke multiple services for a flux / mono in java reactive? 【发布时间】:2021-05-24 16:39:10 【问题描述】:

我是响应式世界的新手,听起来可能是个新手,我有一系列大小为 20-30 的产品,对于每个产品,我需要从不同的微服务中获取以下内容:

    平均评论数 总评论数 wishlistedCount 变种.. .. 6 ..

我尝试过的..

1. doOnNext

Flux<Product> products = ...
products
.doOnNext(product -> updateReviewCount)
.doOnNext(product -> updateTotalCommentCount)
.doOnNext(product -> updateWishlistedCount)
.doOnNext(product -> updateVariants)
...

结果证明,每个产品的每次调用都会阻塞链..

e.g.
Total records(20) * No. of service calls(5) * Time per service calls(30 ms) = 3000ms 

但时间会随着记录的数量而增长||服务调用次数。

2。地图 使用地图我更新并返回相同的参考,但结果是一样的。

3.收集所有作为列表并对下游服务执行聚合查询

Flux<Product> products = ...
products
.collectList() // Mono<List<Product>>
.doOnNext(productList -> updateReviewCountOfAllInList)
.doOnNext(productList -> updateFieldB_ForAllInList)
.doOnNext(productList -> updateFieldC_ForAllInList)
.doOnNext(productList -> updateFieldD_ForAllInList)
...

这确实提高了性能,尽管现在下游应用程序必须为查询返回更多数据,所以下游端增加的时间很少,但没关系。

现在有了这个,我能够实现如下时间...... 总记录(合并为列表,所以 1)* 服务调用次数(5)* 每次服务调用的时间(随着时间增加 50 毫秒)= 250 毫秒

但时间会随着服务调用次数的增加而增加。

现在我需要并行化这些服务调用并并行执行这些服务调用,并在同一产品实例上更新它们各自的字段(相同的引用)。 有些像下面

Flux<Product> products = ... // of 10 products
products
.collectList() // Mono<List<Product>>
.doAllInParallel(serviceCall1, serviceCall2, serviceCall3...)

. // get all updated products // flux size of 10

我想实现时间... 250/5 = 50ms

如何做到这一点? 我发现了不同的文章,但我不确定最好的方法是什么?有人可以帮助我吗?

【问题讨论】:

尝试使用flatMap 而不是doOnNext flatMap 也会阻塞,不会并行调用 【参考方案1】:

它使用了

products // Flux<Product>
.collectList() // Mono<List<Product>>
.flatMap(list -> Mono.zip( this.call1(list) ,this.call2(list) ) ) // will return Tuple
.map(t -> t.getT1) 
.flatMapIterable(i->i)

Mono<Product> call1(List<Product> productList)
   // some logic

Mono<Product> call2(List<Product> productList)
   // some logic

实际上 zip 和 flatmapiterable 也可以一步完成。这里只是为了演示。

【讨论】:

以上是关于java - 如何在Java响应式中为通量/单声道调用多个服务?的主要内容,如果未能解决你的问题,请参考以下文章

从通量转换为单声道

Spring webflux:从请求中消耗单声道或通量

如何使用弹簧反应 webflux 中的单声道和助焊剂使用 DTO 制作新的单声道

如何在不阻塞的情况下渲染具有两个通量场的对象?

如何在Java RS json响应中为简单列表类型设置自定义名称

如何在 java/xuggler 中混音多个音频通道?