如何并行进行多个 Spring Webclient 调用并等待结果?

Posted

技术标签:

【中文标题】如何并行进行多个 Spring Webclient 调用并等待结果?【英文标题】:How to make multiple Spring Webclient calls in parallel and wait for the result? 【发布时间】:2019-10-21 09:32:51 【问题描述】:

我是响应式编程的新手,我想并行进行两个 API 调用并处理结果并返回一个简单的数组或项目列表。

我有两个函数,一个返回 Flux,另一个返回 Mono,我根据 Mono 的结果对 Flux 发出的项进行了非常简单的过滤逻辑。

我尝试使用zipWith,但无论采用何种过滤逻辑,都只有一项完成。我也尝试使用block,但控制器内部不允许这样做:/

@GetMapping("/id/offers")
fun viewTaskOffers(
        @PathVariable("id") id: String,
        @AuthenticationPrincipal user: UserPrincipal
) : Flux<ViewOfferDTO> 
    data class TaskOfferPair(
        val task: TaskDTO,
        val offer: ViewOfferDTO
    )

    return client.getTaskOffers(id).map 
            it.toViewOfferDTO()
        .zipWith(client.getTask(id), BiFunction 
            offer: ViewOfferDTO, task: TaskDTO -> TaskOfferPair(task, offer)
        ).filter 
            it.offer.workerUser.id == user.id || it.task.creatorUser == user.id
        .map 
            it.offer
        

getTaskOffers 返回 OfferDTO 的通量 getTask 返回 TaskDTO 的 Mono

如果您不能回答我的问题,请至少告诉我如何并行执行多个 API 调用并在 WebClient 中等待结果

【问题讨论】:

【参考方案1】:

这是一个并行调用的用例。

public Mono<UserInfo> fetchCarrierUserInfo(User user) 
        Mono<UserInfo> userInfoMono = fetchUserInfo(user.getGuid());
        Mono<CarrierInfo> carrierInfoMono = fetchCarrierInfo(user.getCarrierGuid());

        return Mono.zip(userInfoMono, carrierInfoMono).map(tuple -> 
            UserInfo userInfo = tuple.getT1();
            userInfo.setCarrier(tuple.getT2());
            return userInfo;
        );
    

这里:

fetchUserInfo 进行 http 调用以从另一个服务获取用户信息并返回 Mono fetchCarrierInfo 方法通过 HTTP 调用从另一个服务获取carrierInfo 并返回Mono Mono.zip() 将给定的 Monos 合并到一个新的 Mono 中,当所有给定的 Monos 都产生了一个项目时,新的 Mono 将被实现,并将它们的值聚合到一个 Tuple2 中。

然后,调用fetchCarrierUserInfo().block()得到最终结果。

【讨论】:

当这些 Mono 无效时我可以使用什么。谢谢。我从这里得到这些: Mono values= Mono.fromRunnable(() -> setvalues(x, y, z));【参考方案2】:

正如您已经知道的那样,zipWith 不会帮助您,因为它会产生 min(a.size, b.size),如果其中一个是 Mono,则它始终为 1。

但是由于这两个是独立的,您可以简单地将它们拆分:

val task: Mono<TaskDTO> = client.getTask(id)
val result: Flux<ViewOfferDTO> = 
task.flatMapMany t ->
        client.getTaskOffers(id).map offer ->
            t to offer
        
    .filter 
        it.second.workerUser.id == user.id || it.first.creatorUser == user.id
    .map 
        it.second

注意,如果你想要一对元素,你可以使用内置的Pair

另外,这个检查没有多大意义,因为你只有Monoit.first.creatorUser

【讨论】:

同样getTaskOffers 返回一个Flux,所以映射它会返回一个单一的报价,而不是报价 请记住,我没有你的实际代码,所以我不能 100% 准确。不过,我扩展了我的答案。希望对您有所帮助。【参考方案3】:

使用repeat() 将您的 Mono 转换为 Flux:

client.getTask(id).cache().repeat();

所以你的代码会变成

    return client.getTaskOffers(id).map 
        it.toViewOfferDTO()
    .zipWith(client.getTask(id).cache().repeat(), BiFunction 
        offer: ViewOfferDTO, task: TaskDTO -> TaskOfferPair(task, offer)
    ).filter 
        it.offer.workerUser.id == user.id || it.task.creatorUser == user.id
    .map 
        it.offer
    

【讨论】:

以上是关于如何并行进行多个 Spring Webclient 调用并等待结果?的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 Spring WebClient 同时进行多个调用?

如何使用 Spring5 WebClient 进行异步调用

使用 Spring WebClient 发出多个请求

如何使用 Spring webflux 向 webclient 发送实时进度?

SpringBoot WebFlux - 制作并行 WebClient 请求

Spring Boot WebClient OAuth - 同时命中多个请求时超时