限制 API 调用队列并返回结果的速率

Posted

技术标签:

【中文标题】限制 API 调用队列并返回结果的速率【英文标题】:Rate limiting a queue of API calls and returning the results 【发布时间】:2020-12-05 18:24:09 【问题描述】:

我正在遍历一个数组并使用 async/await 为每个成员进行 API 调用,然后我将结果推送到另一个返回的数组中。

// My current function
async requestForEach(repos) 
    const result = [];
    for (const repo of repos) 
        result.push(await this.doSomething(repo.name));
    
    return result;


// doSomething()
const AWS = require('aws-sdk');
const codecommit = new AWS.CodeCommit();
async doSomething(repoName)
    return (await codecommit.listBranches(
        repoName
    ).promise()).branches;

我的问题是我的速率受到限制。如果我捕获并打印我得到的错误..

ThrottlingException: Rate exceeded 
  // Call stack here
  code: 'ThrottlingException',
  time: 2020-08-16T15:52:56.632Z,
  requestId: '****-****-****-****-****',
  statusCode: 400,
  retryable: true

我正在使用的 API 的文档可以在这里找到 - https://docs.aws.amazon.com/AWSjavascriptSDK/latest/AWS/CodeCommit.html#listBranches-property

我查看了选项,this async library 似乎是最受欢迎的选项。

使用 async.queue()..

添加到队列中的任务被并行处理(直到 并发限制)。如果所有工作人员都在进行中,则任务排队 直到一个可用。一旦工人完成一项任务, 任务的回调被调用。

// create a queue object with concurrency 2
var q = async.queue(function(task, callback) 
    console.log('hello ' + task.name);
    callback();
, 2);

显然我无法从回调函数中取回值,那么我应该如何解决这个问题?

【问题讨论】:

您需要按顺序拨打电话吗?还是可以进行并行调用? 并行调用没问题 不,你不需要使用 async.js(如果你还在使用它,请确保不要使用回调样式)。您的顺序迭代很好,您需要做的就是在获得ThrottlingException 时添加延迟。 嘿@Bergi,请您详细说明delay 部分。很想听听像您这样的专家的解决方案。它将帮助我们/其他人在类似情况下应用更好的解决方案。像PromisesetTimeout 一样吗?在此先感谢???? @newprogrammer 您能否链接您正在使用的 API 文档以及其 ThrottlingExceptions 的外观?或者分享this.doSomething方法的定义?那么也许我可以写一个量身定制的答案。 【参考方案1】:

您可以使用Promise.all 来减少 API 调用的等待时间,如下所示

async requestForEach(repos) 
  return Promise.all(repos.map(repo => this.doSomething(repo.value)));

由于您遇到了与调用总数有关的rate limit 问题,您可以使用es6-promise-pool 之类的库来管理并发请求(5/10 - 根据您的要求)。

并用 recursionMAX_RETRIES 更新this.doSomething(控制environment variable 中的MAX_RETRIES)限制如下

async doSomething(repoName, retries = 0) 
    try 
        const data = await codecommit.listBranches(
            repoName
        ).promise();
        return data.branches;
     catch(err) 
        if (err.code == 'ThrottlingException' && retries <= MAX_RETRIES) 
            await delay(err.retryDelay ?? 1000); // As per @Bergi's answer
            await doSomething(repoName, retries + 1); // Recursive call
         else 
            console.log('Issue with repo: ', repoName);
            throw err; // (Or) return ''; based on requirement
        
    



// Filter out the valid results at the end - Applicable only if you use return '';
const results = await requestForEach(repos);
const finalResults = results.filter(Boolean);

这种方法可能会帮助您减少生产中的等待时间,而不是按顺序循环每个请求。

【讨论】:

原谅我的无知,但这会阻止速率限制问题吗? 我已经用错误更新了问题:)。我需要限制请求的速率,这样我就不会首先收到错误,或者可能暂停然后在达到限制后以某种方式继续。我被您的回答所吸引,因为它不使用外部库。 感谢您使用错误详细信息更新问题。 Promise.all 也可能达到极限。您可以按照建议使用es6-promise-pool 来限制并发性。在这里使用起来很简单。 promise-pool 库看起来不错,但 Bergi 的解决方案最适合我的需求。谢谢! 感谢@Bergi,分享知识。我已经更新了我的答案。希望它对其他一些类似的用例有所帮助:)【参考方案2】:

连续的for … of 循环对我来说看起来不错。您可以add a default delay for each iteration 使其变慢,但您也可以稍后在请求因限制而失败时重试。请注意,这种方法仅适用于您的应用中只有一个请求源(而不是多个并发调用 requestForEach),否则您可能需要全局协调。

async doSomething(repoName) 
    while (true) 
        try 
            const data = await codecommit.listBranches(
                repoName
            ).promise();
            return data.branches;
         catch(err) 
            if (err.code == 'ThrottlingException')  // if (err.retryable) 
                await delay(err.retryDelay ?? 1000);
                continue;
             else 
                throw err;
            
        
    

function delay(time) 
    return new Promise(resolve => 
        setTimeout(resolve, time);
    );

递归方法可能看起来更好,而不是 while (true) 循环。请注意,在生产代码中,您需要限制重试次数,这样您的循环就不会无限运行。

【讨论】:

【参考方案3】:

看起来你想要parallelLimit。

它需要一个接收结果的可选回调。

来自文档。

https://caolan.github.io/async/v3/docs.html#parallelLimit

回调函数 所有函数成功完成后运行的可选回调。此函数获取一个结果数组(或对象),其中包含传递给任务回调的所有结果参数。使用 (err, results) 调用。

例子:

// run 'my_task' 100 times, with parallel limit of 10

  var my_task = function(callback)  ... ;
  var when_done = function(err, results)  ... ;

  // create an array of tasks
  var async_queue = Array(100).fill(my_task);

  async.parallelLimit(async_queue, 10, when_done);

取自: how to use async.parallelLimit to maximize the amount of (paralle) running processes?

【讨论】:

以上是关于限制 API 调用队列并返回结果的速率的主要内容,如果未能解决你的问题,请参考以下文章

Web API 速率限制(四)- 其它和AspNetCoreRateLimit

调用 REST API 时如何处理 Google Ads API 速率限制?

阿里云调用 API 服务后返回啥结果

比较两个 API 调用的结果并在 MEAN 应用程序中返回它们的差异

可以使用 Cloudflare 缓存和保护 REST API 吗?

C语言或者C++如何调用一个http接口并得到返回结果?