使用递增的查询参数重复 WebClient 调用

Posted

技术标签:

【中文标题】使用递增的查询参数重复 WebClient 调用【英文标题】:Repeat WebClient call with incremented query parameter 【发布时间】:2019-11-29 04:01:53 【问题描述】:

我正在尝试构建一种方法,该方法应该对一个外部端点执行许多 HTTP 请求,而一个查询参数小于 45000。

我需要这样做,因为外部端点允许我获取 100 个项目,但要获取的项目超过 44000 个。

private int offset = 0;

public Flux<List<Model>> getItems() 
    return Flux.from(
            webClientBuilder
                    .build()
                    .get()
                    .uri(uriBuilder -> uriBuilder
                            .path("/getItems")
                            .queryParam("limit", 100)
                            .queryParam("offset", getOffset())
                            .build())
                    .retrieve()
                    .bodyToMono(Model.class)
                    .doOnSuccess(System.out::println)
                    .flatMap(model -> 
                        setOffset(getOffset() + 100);
                        log.info("Offset: " + getOffset());
                        return repository.saveAll(model.getData().getResults()).collectList();
                    ).delayElement(Duration.ofSeconds(15)))
                    .repeat(() -> getOffset() <= 45000);


public int getOffset() 
    return offset;


public void setOffset(int offset) 
    this.offset = offset;

这似乎有效,因为记录了偏移量参数递增但 HTTP 请求的偏移量等于 0。该方法返回前 100 个项目而不是 44566 个项目

【问题讨论】:

我只是猜测,但重复我认为重复订阅从 webclient 获取的通量。不要认为它会重复整个通话。我会在 doOnSuccess 中再次调用该方法的递归方法 这可能是个好主意,我也会测试一下,谢谢你的回答。 【参考方案1】:

问题实际上是,webclient 是在订阅之前急切构建的,并且“缓存”,初始值为 offset。每次调用后,Flux 被重新订阅,但准备好的带有偏移量的 Web 服务调用仍然“缓存”。 您必须以惰性方式提供weblient(例如通过将其包装在 lambda 中),这会强制为每次调用重新计算其所有参数。有一个特殊的运算符 - defer()

解决方案

Mono<Model> response = Mono.defer(() -> webClientBuilder
        .build()
        .get()
        .uri(uriBuilder -> uriBuilder
                .path("/getItems")
                .queryParam("limit", 100)
                .queryParam("offset", getOffset())
                .build())
        .retrieve()
        .bodyToMono(Model.class)
);


Flux.from(response
        .doOnEach(System.out::println)
        .flatMap(model -> 
            setOffset(getOffset() + 100);
            log.info("Offset: " + getOffset());
            return repository.saveAll(model.getData().getResults()).collectList();
        ).delayElement(Duration.ofSeconds(15))
).repeat(() -> getOffset() <= 45000).subscribe();

另一个问题表明与急切执行相同的问题: Mono switchIfEmpty() is always called

【讨论】:

感谢您的回答,效果很好,正是我想要做的。 谢谢

以上是关于使用递增的查询参数重复 WebClient 调用的主要内容,如果未能解决你的问题,请参考以下文章

WebClient 编码 queryParams spring

使用 SpringFlux 的 webclient 重复 Mono

webclient buffer异常

LoadRunner能否对请求参数设定递增值?

如何在mysql,php的单个查询中设置与自动递增ID值相同的任何列值[重复]

Spring Boot WebClient.Builder bean 在传统 servlet 多线程应用程序中的使用