SpringBoot WebFlux - 制作并行 WebClient 请求

Posted

技术标签:

【中文标题】SpringBoot WebFlux - 制作并行 WebClient 请求【英文标题】:SpringBoot WebFlux - Making parallel WebClient requests 【发布时间】:2018-12-09 15:48:53 【问题描述】:

我正在尝试使用新的 SpringBoot 2 Reactive WebClient 类(它没有批处理端点)对同一个休息服务进行并行(批处理)调用。例如,我需要 100 个“评论”对象(ID 为 1...100)并且我正在执行以下并行调用:

    List<Mono<Comment>> monos = ids.stream()
            .map(id -> webClient.get()
                    .uri("/comments/id", id)
                    .accept(MediaType.APPLICATION_JSON)
                    .retrieve()
                    .bodyToMono(Comment.class))
            .collect(Collectors.toList());

    return Flux.merge(monos);

我是 Spring WebFlux 的新手,我不确定这是使用 WebClient 进行并行调用的正确方法

有没有更好(更合适)的方法来做到这一点(即做一个 Monos 的 Flux concat) ?

另外,当我这样做时,我使用了旧的已弃用的 AsyncRestTemplate ThreadPoolExecutor...我应该使用类似的概念与 网络客户端? ... 是否有与响应式类似的东西?

问候

完整源码可以绑定在:https://github.com/fdlessard/SpringBootReactiveComment

【问题讨论】:

concat 将按顺序发送请求,而不是并行发送:projectreactor.io/docs/core/release/api/reactor/core/publisher/…。合并会这样做:projectreactor.io/docs/core/release/api/reactor/core/publisher/… 【参考方案1】:
Flux.fromIterable(ids)
  .flatMap(id -> webClient.get()
    .uri("/comments/id", id)
    .accept(MediaType.APPLICATION_JSON)
    .retrieve()
    .bodyToMono(Comment.class))
  .subscribeOn(Schedulers.parallel());

【讨论】:

Schedulers.paralell 上安排工作在这里会适得其反;并行调度程序是为受 CPU 限制的工作而设计的,而进行 http 调用可能受延迟限制。响应式流自然会发生并行性,因为消费者 request 的数量取决于其容量。 有时您需要解决原本不是流的问题,因此您将转换为流并并行处理每个项目。您可以使用 Flux,因此应用并行方法,您应该传递您想要的调度程序,例如:弹性、并行、单一等,这取决于您需要解决的问题

以上是关于SpringBoot WebFlux - 制作并行 WebClient 请求的主要内容,如果未能解决你的问题,请参考以下文章

Webflux 并行连接以某种方式限制为 256

为啥 Spring Webflux 只能并行接受 256 个请求?

WebFlux02 SpringBoot WebFlux实现CRUD

SpringBoot使用WebFlux响应式编程操作数据库

SpringBoot2使用WebFlux函数式编程

SpringBoot2 + Webflux - WebTestClient 总是返回“401 UNAUTHORIZED”