使用 SpringFlux 的 webclient 重复 Mono

Posted

技术标签:

【中文标题】使用 SpringFlux 的 webclient 重复 Mono【英文标题】:Repeat Mono with webclient from SpringFlux 【发布时间】:2020-07-31 10:33:17 【问题描述】:

情况如下: 我发送了第一个请求,然后我以 5s 的间隔重复发送了第二个请求。如果第二个请求获得成功响应,我将其保存在数据库中并执行一些操作,如果它获得不成功(完成)响应,则应重复请求。当尝试大于 3 时,我需要停止重复第二个请求。我该怎么做?

     return firstRequestSenderService.send(request)
        .flatMap(resp -> 
          AtomicInteger attempts = new AtomicInteger(0);
          String url = normalizeUrl(resp.getResult());
          return Mono.defer(() -> 
            log.info("Second request, attempt = , params = ", attempts.get(), param);
            return secondRequestSenderService.send(param, url, attempts.getAndIncrement());
          )
              .filter(this::isCompleteResponse)
              // i try .filter(b -> attempts.get() > 2)
              .doOnNext(r -> log.info("Save report"))
              .map(secondResp -> dataSaver.saveReport(param, secondResp))
              .doOnNext(r -> log.info("Send request to another service"))
              .flatMap(r -> secondRequestSender.sendPdf(r)))
              .doOnNext(bytes -> dataSaver.saveAnotherReport(param, bytes))
              .repeatWhenEmpty(req -> Flux.interval(Duration.ofSeconds(5)));
// also try   .repeatWhenEmpty(3, req -> Flux.interval(Duration.ofSeconds(5)));
        )
        .then(Mono.empty());

【问题讨论】:

【参考方案1】:

检查reactor-addons 你可以做这样的事情

    return firstRequestSenderService.send(request)
        .flatMap(resp -> 
          return secondRequestSenderService
                  .send(param, url, attempts.getAndIncrement())
                  .retryWhen(Retry.any()
                          .randomBackoff(Duration.ofMillis(100), Duration.ofMillis(5000))
                          .retryMax(3));

        );

你也可以使用 reactor core retry utils RetryBackoffSpec 来做到这一点,当你使用最新的 spring webflux 时你会得到这个

  RetryBackoffSpec retryBackoffSpec = Retry
        .fixedDelay(3, Duration.ofSeconds(5));
return firstRequestSenderService.send(request)
        .flatMap(resp -> 
                  return secondRequestSenderService
                          .send(param, url, attempts.getAndIncrement())
                          .retryWhen(retryBackoffSpec);
                );

【讨论】:

谢谢,它有效)但是如果我不能使用这个额外的库,我该如何在基本方法的帮助下做到这一点? 谢谢,最新版本有这个类真的很好)但是在以前的版本中没有方法可以做到这一点。我想看看如何在基本方法的帮助下完成它。为什么例如这个版本.repeatWhenEmpty(3, req -> Flux.interval(Duration.ofSeconds(5))); 不起作用?

以上是关于使用 SpringFlux 的 webclient 重复 Mono的主要内容,如果未能解决你的问题,请参考以下文章

WebClient - 如何获取请求正文?

Spring Flux:如果我需要使用 Flux 和 Spring 5 进行并行 Web 服务调用,实现模式会是啥样子

如何向 WebClient (C#) 添加证书?

使用WebClient实现断点续传 重写

使用WebClient异步获取http资源

使用 ssl 的 Spring 5 WebClient