在获得某个值之前,如何继续调用 API?

Posted

技术标签:

【中文标题】在获得某个值之前,如何继续调用 API?【英文标题】:How can you keep calling API until you get a certain value? 【发布时间】:2021-10-05 14:07:13 【问题描述】:

我是响应式编程和 WebClient 的新手。我想重复调用一个 API,直到我从 API 获得特定值(我正在调用的 API 是为事件流配置的常规 API)。一旦我得到想要的值,我就会采取行动。

我尝试使用 Flux 间隔解决此问题,但运气不佳。我不确定我的代码是否有问题,或者我是否采用了错误的方法。

public Flux<TransactionStatus> watchAndTransact(Duration intervalDuration, Duration watchDuration,
                                                String toAddress, String fromAddress) 
    return Flux.interval(intervalDuration)
            .take(watchDuration)
            .flatMap(aFloat -> getBalance(fromAddress))
            //.retryWhen(getBalance(fromAddress).block() < 0)
            .flatMap(balance -> 

                log.info("Checking balance ", Instant.now());
                //If we find that an amount was deposited into the deposit account
                if (balance > 0) 
                    log.info("Balance Present ", balance.toString());

                    //Build a transaction
                    Transaction transaction = new Transaction(fromAddress, toAddress, balance);

                    //Post the transaction
                    return postTransactionService.postTransaction(transaction);

                
                return Mono.empty();
            ).switchIfEmpty(s -> s.onError(new RuntimeException()));

我正在使用以下代码从 API 获取余额:

public Mono<AddressInfo> getAddressInfo(String address) 
    log.info("Pulling information on the address ", address);

    return webConfig.api
            .get()
            .uri("/addresses/address", address)
            .retrieve()
                .onStatus(HttpStatus::is4xxClientError,
                        error -> Mono.error(new NotFoundResourceException("Couldn't find a record")))
                .onStatus(HttpStatus::is5xxServerError,
                        error -> Mono.error(new BadRequestException("Server isn't responding")))
            .bodyToMono(AddressInfo.class)
            .onErrorReturn(new AddressInfo(0f, EMPTY_LIST));


public Mono<Float> getBalance(String address) 
    return getAddressInfo(address).map(AddressInfo::getBalance);

这就是我设置网络客户端的方式:

@Bean
@LoadBalanced
public WebClient.Builder loadBalancedWebClientBuilder() 

    return WebClient.builder();


public WebClient dbClient = loadBalancedWebClientBuilder()
        .baseUrl(gatewayUrl+"/api/v2/data")
        .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
        .build();

public WebClient apiClient = loadBalancedWebClientBuilder()
        .baseUrl(url)
        .defaultHeader(HttpHeaders.CONTENT_TYPE, MediaType.APPLICATION_JSON_VALUE)
        .build();

如何使用 Spring Boot Reactive 重复调用常规 API?

【问题讨论】:

你试过repeatWhen @Superchamp 谢谢!这就是我所需要的,感谢所有帮助。 【参考方案1】:

没有找到解决这个问题的完美方案,但找到了足够接近的东西。

下面的方法可以多次调用API,并在发现余额大于0后采取一定的行动。它利用了Flux.repeat和一个BooleanSupplier。

问题是它会在找到大于 0 的余额后继续调用 API。另一个与第一个问题相关的问题是,在运行结束时它会抛出服务不可用异常。这可能与这段代码找到大于0的余额后无法停止有关。

/**
 * Check network consistently to see that the transaction went through
 *
 * This method gives us the amount that's deposited to the deposit account
 * and posts a transaction from the deposit account to the house account.
 *
 * @param intervalDuration
 * @param watchDuration
 * @param toAddress
 * @param fromAddress
 * @return
 */
//TODO: Fix this method, after it finds the balance and
// posts the transactio it keeps checking for the balance
//TODO: Handle Service Unavailable Exception thrown when
// the watch duration expires
public Flux<TransactionStatus> watchAndTransact(
        Duration intervalDuration, Duration watchDuration,
        String toAddress, String fromAddress) 

    log.info("Checking balance ", Instant.now());

    //Boolean that determines how long we repeat this interval
    BooleanSupplier isBalanceAbove0 = () -> 
        Float balance = networkWatchService.getBalance(fromAddress).block();
        if (balance > 0) 
            log.info("Balance Present ", balance.toString());
            return true;
        
        return false;
    ;

    return Flux.interval(intervalDuration)
        .take(watchDuration)
        .flatMap(aFloat -> networkWatchService.getBalance(fromAddress))
        .flatMap(balance -> 
            log.info("Checking balance ", Instant.now());
            //If we find that an amount was deposited into the deposit account
            if (balance > 0) 
                log.info("Balance Present ", balance.toString());
                //Build a transaction
                Transaction transaction = 
                        new Transaction(fromAddress, toAddress, balance);
                //Post the transaction
                return postTransactionService.postTransaction(transaction);
            
            return Mono.empty();
        )
        .repeat(isBalanceAbove0)
        .switchIfEmpty(s -> s.onError(new Exception("Balance insufficient")));

【讨论】:

以上是关于在获得某个值之前,如何继续调用 API?的主要内容,如果未能解决你的问题,请参考以下文章

如何使用事件发射器调用返回值的函数,并在继续之前等待响应?

如何在调用我的 api 之前减慢/​​等待?

组合框架:如何在继续之前异步处理数组的每个元素

如何调用百度地图API

如何调用百度地图API

更改文本字段,调用 api - 如何限制这个?