Webflux 并行连接以某种方式限制为 256

Posted

技术标签:

【中文标题】Webflux 并行连接以某种方式限制为 256【英文标题】:Webflux parallel connections somehow limited to 256 【发布时间】:2019-02-05 11:33:57 【问题描述】:

我有一个简单的服务器和客户端设置:

Flux.range(1, 5000)
        .subscribeOn(Schedulers.parallel())
        .flatMap(i -> WebClient.create()
            .method(HttpMethod.POST)
            .uri("http://localhost:8080/test")
            .body(Mono.just(String.valueOf(i)), String.class)
            .exchange())
        .publishOn(Schedulers.parallel())
        .subscribe(response ->
            response.bodyToMono(String.class)
                .publishOn(Schedulers.elastic())
                .subscribe(body -> log.info("", body)));

这里是客户端:

@PostMapping
public Mono<String> test(@RequestBody Mono<String> body) 
    return body.delayElement(Duration.ofSeconds(5));

这两件事都在 netty 上运行。也许有人知道是什么导致了这种行为?

【问题讨论】:

【参考方案1】:

这不是因为WebClient 对连接池的限制,但这实际上来自您可以更改的 Reactor 实现细节。

默认情况下,flatMap 等 Reactor 运算符具有 prefetch=32(在最终订阅者请求之前我们请求的元素数量)和 maxConcurrency=256(运算符同时处理的最大元素数量)。

您可以使用Flux.flatMap(Function mapper, int concurrency, int prefetch) 的变体来改变这种行为。

您的代码 sn-p 混合使用了subscribeOnpublishOn;我想说,鉴于您正在使用此代码 sn-p 进行反应式 I/O 工作,您不应该尝试在弹性/并行调度程序上安排工作。最好在这里删除这些运算符。

【讨论】:

谢谢,我想我应该在做其他事情的时候离开 http nio 线程。

以上是关于Webflux 并行连接以某种方式限制为 256的主要内容,如果未能解决你的问题,请参考以下文章

DataBufferLimitException:超过最大字节数限制以缓冲 webflux 错误

如何将 LINQ 左外连接限制为一行

如何以某种并行的方式有效地计算两个数组列表的外积?

SpringBoot WebFlux - 制作并行 WebClient 请求

使用 Spring Webflux 返回元素列表

oracle表之间的连接