限制 API 调用队列并返回结果的速率
Posted
技术标签:
【中文标题】限制 API 调用队列并返回结果的速率【英文标题】:Rate limiting a queue of API calls and returning the results 【发布时间】:2020-12-05 18:24:09 【问题描述】:我正在遍历一个数组并使用 async/await 为每个成员进行 API 调用,然后我将结果推送到另一个返回的数组中。
// My current function
async requestForEach(repos)
const result = [];
for (const repo of repos)
result.push(await this.doSomething(repo.name));
return result;
// doSomething()
const AWS = require('aws-sdk');
const codecommit = new AWS.CodeCommit();
async doSomething(repoName)
return (await codecommit.listBranches(
repoName
).promise()).branches;
我的问题是我的速率受到限制。如果我捕获并打印我得到的错误..
ThrottlingException: Rate exceeded
// Call stack here
code: 'ThrottlingException',
time: 2020-08-16T15:52:56.632Z,
requestId: '****-****-****-****-****',
statusCode: 400,
retryable: true
我正在使用的 API 的文档可以在这里找到 - https://docs.aws.amazon.com/AWSjavascriptSDK/latest/AWS/CodeCommit.html#listBranches-property
我查看了选项,this async library 似乎是最受欢迎的选项。
使用 async.queue()..
添加到队列中的任务被并行处理(直到 并发限制)。如果所有工作人员都在进行中,则任务排队 直到一个可用。一旦工人完成一项任务, 任务的回调被调用。
// create a queue object with concurrency 2 var q = async.queue(function(task, callback) console.log('hello ' + task.name); callback(); , 2);
显然我无法从回调函数中取回值,那么我应该如何解决这个问题?
【问题讨论】:
您需要按顺序拨打电话吗?还是可以进行并行调用? 并行调用没问题 不,你不需要使用 async.js(如果你还在使用它,请确保不要使用回调样式)。您的顺序迭代很好,您需要做的就是在获得ThrottlingException
时添加延迟。
嘿@Bergi,请您详细说明delay
部分。很想听听像您这样的专家的解决方案。它将帮助我们/其他人在类似情况下应用更好的解决方案。像Promise
和setTimeout
一样吗?在此先感谢????
@newprogrammer 您能否链接您正在使用的 API 文档以及其 ThrottlingExceptions
的外观?或者分享this.doSomething
方法的定义?那么也许我可以写一个量身定制的答案。
【参考方案1】:
您可以使用Promise.all 来减少 API 调用的等待时间,如下所示
async requestForEach(repos)
return Promise.all(repos.map(repo => this.doSomething(repo.value)));
由于您遇到了与调用总数有关的rate limit
问题,您可以使用es6-promise-pool 之类的库来管理并发请求(5/10 - 根据您的要求)。
并用 recursion 和 MAX_RETRIES 更新this.doSomething
(控制environment variable
中的MAX_RETRIES
)限制如下
async doSomething(repoName, retries = 0)
try
const data = await codecommit.listBranches(
repoName
).promise();
return data.branches;
catch(err)
if (err.code == 'ThrottlingException' && retries <= MAX_RETRIES)
await delay(err.retryDelay ?? 1000); // As per @Bergi's answer
await doSomething(repoName, retries + 1); // Recursive call
else
console.log('Issue with repo: ', repoName);
throw err; // (Or) return ''; based on requirement
// Filter out the valid results at the end - Applicable only if you use return '';
const results = await requestForEach(repos);
const finalResults = results.filter(Boolean);
这种方法可能会帮助您减少生产中的等待时间,而不是按顺序循环每个请求。
【讨论】:
原谅我的无知,但这会阻止速率限制问题吗? 我已经用错误更新了问题:)。我需要限制请求的速率,这样我就不会首先收到错误,或者可能暂停然后在达到限制后以某种方式继续。我被您的回答所吸引,因为它不使用外部库。 感谢您使用错误详细信息更新问题。Promise.all
也可能达到极限。您可以按照建议使用es6-promise-pool
来限制并发性。在这里使用起来很简单。
promise-pool 库看起来不错,但 Bergi 的解决方案最适合我的需求。谢谢!
感谢@Bergi,分享知识。我已经更新了我的答案。希望它对其他一些类似的用例有所帮助:)【参考方案2】:
连续的for … of
循环对我来说看起来不错。您可以add a default delay for each iteration 使其变慢,但您也可以稍后在请求因限制而失败时重试。请注意,这种方法仅适用于您的应用中只有一个请求源(而不是多个并发调用 requestForEach
),否则您可能需要全局协调。
async doSomething(repoName)
while (true)
try
const data = await codecommit.listBranches(
repoName
).promise();
return data.branches;
catch(err)
if (err.code == 'ThrottlingException') // if (err.retryable)
await delay(err.retryDelay ?? 1000);
continue;
else
throw err;
function delay(time)
return new Promise(resolve =>
setTimeout(resolve, time);
);
递归方法可能看起来更好,而不是 while (true)
循环。请注意,在生产代码中,您需要限制重试次数,这样您的循环就不会无限运行。
【讨论】:
【参考方案3】:看起来你想要parallelLimit。
它需要一个接收结果的可选回调。
来自文档。
https://caolan.github.io/async/v3/docs.html#parallelLimit
回调函数 所有函数成功完成后运行的可选回调。此函数获取一个结果数组(或对象),其中包含传递给任务回调的所有结果参数。使用 (err, results) 调用。
例子:
// run 'my_task' 100 times, with parallel limit of 10
var my_task = function(callback) ... ;
var when_done = function(err, results) ... ;
// create an array of tasks
var async_queue = Array(100).fill(my_task);
async.parallelLimit(async_queue, 10, when_done);
取自: how to use async.parallelLimit to maximize the amount of (paralle) running processes?
【讨论】:
以上是关于限制 API 调用队列并返回结果的速率的主要内容,如果未能解决你的问题,请参考以下文章
Web API 速率限制(四)- 其它和AspNetCoreRateLimit
调用 REST API 时如何处理 Google Ads API 速率限制?
比较两个 API 调用的结果并在 MEAN 应用程序中返回它们的差异