Java API 调用将根据队列大小进行合并或在最旧项目插入队列后 4 秒内执行

Posted

技术标签:

【中文标题】Java API 调用将根据队列大小进行合并或在最旧项目插入队列后 4 秒内执行【英文标题】:Java API calls to be consolidated based on queue size or executed within 4 seconds of the oldest item being inserted into the queue 【发布时间】:2020-11-04 04:45:38 【问题描述】:

我无法找到基于队列大小合并 API 调用的最佳方法,也无法在最旧项目插入队列后的 4 秒内执行。请在问题说明下方找到:

问题陈述:

为了防止 API 因查询调用而过载,我想合并每个 API 端点的调用。

每个单独 API 的所有传入请求都应保存在一个队列中,并作为 一旦达到单个 API 的 4 次调用上限。如果达到特定 API 的上限,将使用带有逗号分隔值的 q 参数发送单个请求。因此,如果未达到特定服务的队列上限,调用者将不会收到对其请求的响应。为了解决这个问题,我希望服务队列也能在最旧的项目被插入队列后的 4 秒内发送出去。

Example :

GET http://<host>:8080/products?q=190763,190764
200 OK
content-type: application/json

"190763": 140.00
"190764": 250.00


If there is a caller querying API and the queue of the Products API holds 4 
requests, the next request to the Products API will trigger the actual bulk 
request to be made. Each API will have its own queue.

我正在使用 Spring Boot。我尝试将 CompletableFuture 与执行器服务一起使用,但无法解决这个问题。任何帮助将不胜感激。

【问题讨论】:

您是否考虑过使用像 Hystrix 或 Resilience4j 这样的弹性框架? @daniu 不。我想也许这可以通过检查队列大小来解决,如果小于 5 然后添加到队列中并使用 CompletableFuture 的 allOf 和 join 方法等待它们全部完成。但是来不及了。 【参考方案1】:

您的执行者服务方法是有效的。我尝试了一个简单的示例,其中包含 3 个任务和一个预定的执行器服务:

任务创建传入请求 任务检查队列是否有超过 4 个请求并调用 API 任务检查等待时间是否为 4 秒并调用 API

对 API 的调用始终合并队列中的请求。队列必须是线程安全的。

public class ConsolidateApiCalls 

public static final ConcurrentLinkedDeque<String> queue = new ConcurrentLinkedDeque<>();
public static LocalTime timeAdded = LocalTime.now();

public static void main(String[] args) throws JSONException, ExecutionException, InterruptedException 
    ScheduledExecutorService executor = Executors.newScheduledThreadPool(4);
    executor.scheduleAtFixedRate(() -> 
                queue.add("NL");
                if (queue.size() == 1) timeAdded = LocalTime.now();
            ,
            1, 500, TimeUnit.MILLISECONDS);
    executor.scheduleAtFixedRate(() -> 
                if (queue.size() >= 4) 
                    callAPI("size");
                
            ,
            1, 200, TimeUnit.MILLISECONDS);
    executor.scheduleAtFixedRate(() -> 
                if (Duration.between(LocalTime.now(), timeAdded.plusSeconds(4)).isNegative() && !queue.isEmpty()) 
                    callAPI("time");
                
            ,
            1, 500, TimeUnit.MILLISECONDS);


public static void callAPI(String reason) 
    System.out.println("API called due to " + reason + " limit.");
    StringBuilder result = new StringBuilder();
    queue.forEach(s -> result.append(s).append(": ").append(ThreadLocalRandom.current().nextDouble()).append(", "));
    queue.clear();
    System.out.println(result);

您可以更改每个任务的周期,并查看由于大小和时间限制而调用 API 的频率。

【讨论】:

此代码有效,但如果我们第一次通过浏览器或邮递员测试 API 调用,我们会得到回复 null 而调用应该等待 4 秒或另外 3 个请求,直到显示响应。因此,不接受这是完整的答案。我尝试使用 completableFuture 在发送任何响应之前等待调用完成,但仍然没有成功。

以上是关于Java API 调用将根据队列大小进行合并或在最旧项目插入队列后 4 秒内执行的主要内容,如果未能解决你的问题,请参考以下文章

Java中setState(true)一般啥意思

Java - Queue API

在 Kubernetes 中,如何根据队列的大小进行自动缩放?

java多线程 -- ForkJoinPool 分支/ 合并框架 工作窃取

限制 API 调用队列并返回结果的速率

vue异步渲染