如何将 Promise.all() 限制为每秒 5 个 Promise?

Posted

技术标签:

【中文标题】如何将 Promise.all() 限制为每秒 5 个 Promise?【英文标题】:How to throttle Promise.all() to 5 promises per second? 【发布时间】:2019-05-25 16:10:36 【问题描述】:

我有几个项目需要查询 3rd 方 API,并且说 API 的调用限制为每秒 5 次调用。我需要以某种方式将对 API 的调用限制为每秒最多 5 次调用。

到目前为止,我刚刚在一组承诺上使用了Promise.all(),其中每个承诺向 API 发送一个请求,并在 API 以 HTTP 状态代码 200 响应时解析,并在它以其他一些响应时拒绝状态码。但是,当数组中有超过 5 个项目时,我冒着 Promise.all() 拒绝的风险。

如何将Promise.all() 调用限制为每秒 5 次调用?

【问题讨论】:

也发布一些代码 Promise - 就像请求的影子。已经发送的请求。牢记这一点,当您已经获得承诺时,无法限制请求。尝试检查您用于请求的库 - 也许它已经具有限制请求的功能 你不能用Promise.all做任何事情,因为在那个阶段所有的承诺都已经创建并且所有的任务都已经运行了。 Promise.all 不会“调用”任何东西,它会等待现有的东西。您首先需要在创建 Promise 的循环中限制 API 调用。 看看***.com/a/39197252/1048572和***.com/a/38778887/1048572 这能回答你的问题吗? Throttle amount of promises open at a given time 【参考方案1】:

也许我头脑简单,但我写的这个版本只是将传入的数组分成 5 个 promise 的块,并在每个块上执行 Promise.all()

utility.throttledPromiseAll = async (promises) => 
  const MAX_IN_PROCESS = 5;
  const results = new Array(promises.length);

  async function doBlock(startIndex) 
    // Shallow-copy a block of promises to work on
    const currBlock = promises.slice(startIndex, startIndex + MAX_IN_PROCESS);
    // Await the completion. If any fail, it will throw and that's good.
    const blockResults = await Promise.all(currBlock);
    // Assuming all succeeded, copy the results into the results array
    for (let ix = 0; ix < blockResults.length; ix++) 
      results[ix + startIndex] = blockResults[ix];
    
  

  for (let iBlock = 0; iBlock < promises.length; iBlock += MAX_IN_PROCESS) 
    await doBlock(iBlock);
  
  return results;
;

【讨论】:

头脑不简单!简单的解决方案也有一定的优点。请记住,在 whole 块完成之前,脚本不会进入下一个块。【参考方案2】:

我们可以使用生成器来发送一组中的承诺列表。 一旦解决了第一个收益,我们就可以进行另一个收益。 我们将结果存储在一个数组中。 一旦 promiseArray 长度等于结果长度,我们就可以解决 包装的承诺。

const fetch = require("isomorphic-fetch");
const totalPromiseLength = 5;
const requestMethod = url => () => fetch(url).then(response => response.json());
let promiseArray = [...new Array(totalPromiseLength).keys()].map(index =>
  requestMethod("https://jsonplaceholder.typicode.com/todos/" + (index + 1))
);
function* chunks(arr, limit) 
  for (let i = 0; i < Math.ceil(arr.length / limit); ++i) 
    yield [...arr].slice(i * limit, i * limit + limit);
  


new Promise(async resolve => 
  let generated = chunks(promiseArray, 2);
  let result = [];
  for (let bla of generated) 
    await Promise.all(bla.map(param => param())).then(response => 
      result = [...result, ...response];
      if (result.length === promiseArray.length) 
        resolve(result);
      
    );
  
).then(response => 
  console.log(response);
);

【讨论】:

【参考方案3】:

我认为您可以将您的问题分为两部分:同时调用不超过 5 个,并确保在最旧的 1 秒之后才发生最新的调用。

第一部分很容易通过惊人的p-limit 库解决——它具有迄今为止我见过的最简单的界面。

对于第二部分,您需要实际跟踪每个呼叫何时开始 - 即实现等待功能: 基本伪代码,未测试:

import pLimit from 'p-limit';
const apiLimit = pLimit(5);

const startTimes = [];

async function rateLimiter(item) 
  const lastSecond = (new Date().getTime()) - 1000;
  if (startTimes.filter(v => v > lastSecond).length >= 5) 
    await new Promise(r => setTimeout(r, 1000));
  
  // TODO: cleanup startTimes to avoid memory leak
  startTimes.push(new Date().getTime());
  return apiCall(item);


await Promise.all(items.map(v => apiLimit(() => rateLimiter(v))))

【讨论】:

为了改进这个答案,p-queue 内置了 interval 和 intervalCap,它允许您使用更简单的 API 来执行此操作。感谢您的推荐,它让我找到了这个模块。【参考方案4】:

在没有库的情况下使用 ES6

export async function asyncForEach(array, callback) 
  for (let index = 0; index < array.length; index++) 
    await callback(array[index], index, array);
  

export function split(arr, n) 
  var res = [];
  while (arr.length) 
    res.push(arr.splice(0, n));
  
  return res;

export const delayMS = (t = 200) => 
  return new Promise(resolve => 
    setTimeout(() => 
      resolve(t);
    , t);
  );
;
export const throttledPromises = (
  asyncFunction,
  items = [],
  batchSize = 1,
  delay = 0
) => 
  return new Promise(async (resolve, reject) => 
    const output = [];
    const batches= split(items, batchSize);
    await asyncForEach(batches, async (batch) => 
      const promises = batch.map(asyncFunction).map(p => p.catch(reject));
      const results = await Promise.all(promises);
      output.push(...results);
      await delayMS(delay);
    );
    resolve(output);
  );
;

【讨论】:

在我尝试过的所有解决方案中,这个效果非常好。谢谢!【参考方案5】:

如果您不太担心顺序解决 Promise,可以使用 bluebird 中的并发选项。

下面将一次只处理 5 个查询。

const Promise = require('bluebird');

const buildQueries = (count) => 
  let queries = [];

  for(let i = 0; i < count; i++) 
    queries.push(user: i);
  ;

  return queries;
;

const apiCall = (item) => 
  return new Promise(async (resolve, reject) => 
    await Promise.delay(1000);
    resolve(item.user);
  );
;

const queries = buildQueries(20);

Promise.map(queries, async query => 
  console.log( await apiCall(query) );
, concurrency: 5);

【讨论】:

【参考方案6】:

希望对你有帮助。

还可以说,这将使用Promise.all 来解决所有请求,如果您有大量查询,这将等待所有请求都解决,并且可能会导致您的代码等待大量响应以获取所有响应。 而且,如果其中一个请求被拒绝,Promise.all 将拒绝。

我建议,如果您不需要将所有结果放在一起,最好使用 lodash debounce 或 throttle 之类的其他东西或处理此问题的框架。

let items = [
    name: 'item1', 
    name: 'item2', 
    name: 'item3', 
    name: 'item4', 
    name: 'item5', 
    name: 'item6'
];

// This is the api request that you send and return a promise
function apiCall(item) 
  return new Promise((resolve) => 
    setTimeout(() => resolve(item.name), 1000);
  )


new Promise((resolve) => 
  let results = [];

  function sendReq (itemsList, iterate, apiCall) 
    setTimeout(() => 
      // slice itemsList to send request according to the api limit
      let slicedArray = itemsList.slice(iterate * 5, (iterate * 5 + 5));
      result = slicedArray.map(item => apiCall(item));
      results = [...results, ...result];

      // This will resolve the promise when reaches to the last iteration
      if (iterate === Math.ceil(items.length / 5) - 1) 
          resolve(results);
      
    , (1000 * iterate)); // every 1000ms runs (api limit of one second)
  

  // This will make iteration to split array (requests) to chunks of five items 
  for (i = 0; i < Math.ceil(items.length / 5); i++) 
    sendReq(items, i, apiCall);
  
).then(Promise.all.bind(Promise)).then(console.log);
// Use Promise.all to wait for all requests to resolve
// To use it this way binding is required

【讨论】:

确定问题实际上是关于去抖动是这里的关键。

以上是关于如何将 Promise.all() 限制为每秒 5 个 Promise?的主要内容,如果未能解决你的问题,请参考以下文章

Promise.all 限制并发

Promise.all 限制并发

Promise.all并发限制

Promise.all并发限制

Promise.all并发限制

Promise.all并发限制