尝试使用 async.parallelLimit 进行连续的并行 API 调用

Posted

技术标签:

【中文标题】尝试使用 async.parallelLimit 进行连续的并行 API 调用【英文标题】:Trying to use async.parallelLimit to make to make successive parallel API calls 【发布时间】:2021-01-13 11:47:58 【问题描述】:

我有一个对象数据数组,例如 [number:1、number:2、number:3...number:100]。并希望以 10 个连续批次进行并行 API 调用,直到处理完整个数组。

我该怎么做?

这是我的代码。它超过了前 10 个,但随后就停止了。

const async = require("async");
const axios = require("axios");
const calls = require("../model/data"); // [number:1, number:2,number:3,...number:100]

let makeAPICall = function (request, callback) 
  axios
    .post("http://www.api.com/", 
      number: `$request.number`,
      webhookURL: "http://localhost:8000/",
    )
    .then(function (response) )
    .catch(function (err) 
      callback(err);
    );
;

const functionArray = calls.map((request) => 
  return (callback) => makeAPICall(request, callback);
);

exports.startCalls = (req, res, next) => 
  async.parallelLimit(functionArray, 10, (err, results) => 
    if (err) 
      console.error("Error: ", err);
     else 
      console.log("Results: ", results.length, results);
    
  );
;

【问题讨论】:

对于初学者,您的makeAPICall()函数在axios调用成功时不会调用回调。 谢谢!我修好了。 你是说现在剩下的都可以工作了吗? 它几乎处理了整个事情(除了一个),但不是很有序。最新的批次首先完成。此外,API 向服务器发出的 post 请求是无序的。 并行运行请求意味着它们可以以未知的顺序完成。管理并行请求的一个很好的功能将按照您最初指定请求的顺序收集结果,但是当每个单独的完成时是盲目的运气(您正在并行运行异步操作,因此它们在完成时完成)。如果您绝对希望它们按顺序运行和完成,那么您必须一个接一个地对它们进行排序,而不是并行运行它们。 【参考方案1】:

如果您真的想以固定批次运行它们,并且整个批次在您运行更多请求之前完成,您可以执行以下操作:

const axios = require("axios");
const calls = require("../model/data"); // [number:1, number:2,number:3,...number:100]

function makeAPICall(request) 
  return axios.post("http://www.api.com/", 
      number: `$request.number`,
      webhookURL: "http://localhost:8000/",
  );
;

async function runBatches(array, batchSize, fn) 
    let index = 0;
    let results = [];
    while (index < array.length) 
        let promises = [];
        for (let num = 0; num < batchSize && index < array.length; ++num) 
            promises.push(makeAPICall(array[index++]));
        
        let batchResults = await Promise.all(promises);
        results.push(...batchResults);
    
    return results;


runBatches(calls, 10, makeAPICall).then(results => 
   // all results in order here
   console.log(results);
).catch(err => 
    console.log(err);
);

【讨论】:

【参考方案2】:

因此,您的方法有些倒退,因为您已经返回了一个 Promise (Axios),这是管理异步操作的现代方式,现在您正尝试将其转换回普通回调,以便您可以使用老式库(异步库)。运行 10 也更慢,等待所有 10 完成后再开始,通常不需要这样做。

相反,我建议您使用您已经拥有的 Axios 承诺。您可以使用 Bluebird 的 .map(),它有一个 concurrency 设置,或者您可以使用这段代码,它为您提供了一个函数,用于并行运行 Promise 生成函数,同时控制在任何给定时间运行的最大数量:

// takes an array of items and a function that returns a promise
function mapConcurrent(items, maxConcurrent, fn) 
    let index = 0;
    let inFlightCntr = 0;
    let doneCntr = 0;
    let results = new Array(items.length);
    let stop = false;

    return new Promise(function(resolve, reject) 

        function runNext() 
            let i = index;
            ++inFlightCntr;
            fn(items[index], index++).then(function(val) 
                ++doneCntr;
                --inFlightCntr;
                results[i] = val;
                run();
            , function(err) 
                // set flag so we don't launch any more requests
                stop = true;
                reject(err);
            );
        

        function run() 
            // launch as many as we're allowed to
            while (!stop && inflightCntr < maxConcurrent && index < items.length) 
                runNext();
            
            // if all are done, then resolve parent promise with results
            if (doneCntr === items.length) 
                resolve(results);
            
        

        run();
    );

然后你会像这样使用它:

let makeAPICall = function(request) 
  return axios.post("http://www.api.com/", 
      number: `$request.number`,
      webhookURL: "http://localhost:8000/",
    );
;

mapConcurrent(calls, 10, makeAPICall).then(results => 
   // all results in order here
   console.log(results);
).catch(err => 
    console.log(err);
);

在此处查看另一个类似问题:Promise.all consumes all my RAM

【讨论】:

以上是关于尝试使用 async.parallelLimit 进行连续的并行 API 调用的主要内容,如果未能解决你的问题,请参考以下文章

Async.js - 并行真的是并行的吗?

如何处理 Swift 的“尝试”?导致“尝试的结果?未使用”? [复制]

我正在尝试从 pyspark 访问 mysql 表。我正在尝试使用:

尝试使用堆叠小部件构建布局

尝试使用 Curl 登录网站

不安全的 JavaScript 尝试使用框架访问