使用 ES6 的 Promise.all() 时限制并发的最佳方法是啥?

Posted

技术标签:

【中文标题】使用 ES6 的 Promise.all() 时限制并发的最佳方法是啥?【英文标题】:What is the best way to limit concurrency when using ES6's Promise.all()?使用 ES6 的 Promise.all() 时限制并发的最佳方法是什么? 【发布时间】:2017-03-31 01:58:31 【问题描述】:

我有一些代码正在遍历从数据库中查询出来的列表,并为该列表中的每个元素发出 HTTP 请求。该列表有时可能是一个相当大的数字(以数千计),我想确保我不会访问具有数千个并发 HTTP 请求的 Web 服务器。

此代码的缩写版本目前看起来像这样......

function getCounts() 
  return users.map(user => 
    return new Promise(resolve => 
      remoteServer.getCount(user) // makes an HTTP request
      .then(() => 
        /* snip */
        resolve();
      );
    );
  );


Promise.all(getCounts()).then(() =>  /* snip */);

此代码在节点 4.3.2 上运行。重申一下,是否可以管理 Promise.all,以便在任何给定时间只有一定数量的 Promise 正在进行?

【问题讨论】:

Limit concurrency of promise being run 的可能重复项 别忘了Promise.all does manage promise progression - 承诺会自己做,Promise.all 只是在等待它们。 避免Promise constructor antipattern! gist.github.com/alexpsi/… 【参考方案1】:

P-限制

我将 promise 并发限制与自定义脚本、bluebird、es6-promise-pool 和 p-limit 进行了比较。我相信p-limit 有最简单、最精简的实现来满足这种需求。 See their documentation.

要求

为了兼容示例中的异步

ECMAScript 2017 (version 8) 节点版本>8.2.1

我的例子

在这个例子中,我们需要为数组中的每个 URL 运行一个函数(比如,可能是一个 API 请求)。这里称为fetchData()。如果我们有一个包含数千个项目的数组要处理,并发对于节省 CPU 和内存资源肯定是有用的。

const pLimit = require('p-limit');

// Example Concurrency of 3 promise at once
const limit = pLimit(3);

let urls = [
    "http://www.exampleone.com/",
    "http://www.exampletwo.com/",
    "http://www.examplethree.com/",
    "http://www.examplefour.com/",
]

// Create an array of our promises using map (fetchData() returns a promise)
let promises = urls.map(url => 

    // wrap the function we are calling in the limit function we defined above
    return limit(() => fetchData(url));
);

(async () => 
    // Only three promises are run at once (as defined above)
    const result = await Promise.all(promises);
    console.log(result);
)();

控制台日志结果是您已解决的承诺响应数据的数组。

【讨论】:

感谢这个!这个简单多了 这是迄今为止我见过的用于限制同时请求的最佳库。还有很好的例子,谢谢! 感谢您进行比较。你和github.com/rxaviers/async-pool比较过吗? 易于使用,不错的选择。 除了这个之外,每周在 npm 上的下载量约为 2000 万,而其他答案中提到的其他库的下载量约为 200-100k。【参考方案2】:

请注意,Promise.all() 不会触发 Promise 开始工作,而创建 Promise 本身会触发。

考虑到这一点,一种解决方案是在解决承诺时检查是否应该启动新的承诺,或者您是否已经达到限制。

但是,这里真的没有必要重新发明***。 One library that you could use for this purpose is es6-promise-pool。从他们的例子中:

// On the Web, leave out this line and use the script tag above instead. 
var PromisePool = require('es6-promise-pool')

var promiseProducer = function () 
  // Your code goes here. 
  // If there is work left to be done, return the next work item as a promise. 
  // Otherwise, return null to indicate that all promises have been created. 
  // Scroll down for an example. 


// The number of promises to process simultaneously. 
var concurrency = 3

// Create a pool. 
var pool = new PromisePool(promiseProducer, concurrency)

// Start the pool. 
var poolPromise = pool.start()

// Wait for the pool to settle. 
poolPromise.then(function () 
  console.log('All promises fulfilled')
, function (error) 
  console.log('Some promise rejected: ' + error.message)
)

【讨论】:

不幸的是 es6-promise-pool 重新发明了 Promise 而不是使用它们。我建议使用这个简洁的解决方案(如果您已经使用 ES6 或 ES7)github.com/rxaviers/async-pool 看了一下,异步池看起来好多了!更直接、更轻便。 我还发现 p-limit 是最简单的实现。请参阅下面的示例。 ***.com/a/52262024/8177355 我认为 tiny-asyc-pool 在限制 Promise 并发方面要好得多、非侵入性且相当自然的解决方案。 异步池本质上是不同的,它使用一组承诺而不是返回承诺或空值的函数来完成【参考方案3】:

使用Array.prototype.splice

while (funcs.length) 
  // 100 at a time
  await Promise.all( funcs.splice(0, 100).map(f => f()) )

【讨论】:

这是一个被低估的解决方案。喜欢简单。 这将批量运行函数而不是池,其中一个函数在另一个函数完成时立即被调用。 喜欢这个解决方案! 在缺乏更多上下文的情况下花了一秒钟的时间来掌握它在做什么,例如它是一个批处理而不是一个池。每次从开头或中间拼接时,您都会重新排序数组。 (浏览器必须重新索引所有项目)理论性能更好的替代方案是从末尾取东西而不是arr.splice(-100) 如果顺序不正确,也许你可以反转数组:P 对于批量运行非常有用。注意:在当前批次 100% 完成之前,下一批不会开始。【参考方案4】:

如果您知道迭代器是如何工作的以及它们是如何被使用的,那么您就不需要任何额外的库,因为您自己构建自己的并发会变得非常容易。让我演示一下:

/* [Symbol.iterator]() is equivalent to .values()
const iterator = [1,2,3][Symbol.iterator]() */
const iterator = [1,2,3].values()


// loop over all items with for..of
for (const x of iterator) 
  console.log('x:', x)
  
  // notices how this loop continues the same iterator
  // and consumes the rest of the iterator, making the
  // outer loop not logging any more x's
  for (const y of iterator) 
    console.log('y:', y)
  

我们可以使用相同的迭代器并在工作人员之间共享。

如果您使用.entries() 而不是.values(),您将获得一个带有[[index, value]] 的二维数组,我将在下面以 2 的并发性进行演示

const sleep = t => new Promise(rs => setTimeout(rs, t))

async function doWork(iterator) 
  for (let [index, item] of iterator) 
    await sleep(1000)
    console.log(index + ': ' + item)
  


const iterator = Array.from('abcdefghij').entries()
const workers = new Array(2).fill(iterator).map(doWork)
//    ^--- starts two workers sharing the same iterator

Promise.allSettled(workers).then(() => console.log('done'))

这样做的好处是您可以拥有generator function,而不是一次准备好一切。

更棒的是,您可以在节点中执行stream.Readable.from(iterator)(最终也可以在 whatwg 流中)。并且使用可转移的 ReadbleStream,如果您正在与网络工作者一起工作以进行表演,这使得该功能在该功能中非常有用


注意: 与示例 async-pool 相比,它的不同之处在于它产生了两个工作人员,因此如果一个工作人员由于某种原因在索引 5 处抛出错误,它不会停止另一个工作人员工人做剩下的。所以你从 2 个并发减少到 1 个。(所以它不会停在那里)所以我的建议是你在 doWork 函数中捕获所有错误

【讨论】:

这太棒了!谢谢无尽! 这绝对是一个很酷的方法!只要确保你的并发不超过你的任务列表的长度(如果你关心结果的话),因为你最终可能会得到额外的! 当 Streams 获得 Readable.from(iterator) 支持时,可能会更酷一些。 Chrome 已经制作了流 transferable。因此您可以创建可读的流并将其发送给网络工作者,所有这些最终都将使用相同的底层迭代器。 它绝对应该作为 NPM 模块发布,我想使用它。 @KrisOye 当并发超过任务列表的长度时,我不会重现问题。我刚刚尝试用 20 个工人运行代码 sn-p (new Array(20));它按要求完成,没有额外内容。 (额外的工人立即完成,因为迭代器在他们启动之前就完成了。)【参考方案5】:

不要使用 promise 来限制 http 请求,而是使用 node 的内置 http.Agent.maxSockets。这消除了使用库或编写自己的池代码的要求,并具有额外的优势,可以更好地控制您的限制。

agent.maxSockets

默认设置为无穷大。确定代理可以为每个源打开多少个并发套接字。 Origin 是 'host:port' 或 'host:port:localAddress' 组合。

例如:

var http = require('http');
var agent = new http.Agent(maxSockets: 5); // 5 concurrent connections per origin
var request = http.request(..., agent: agent, ...);

如果向同一来源发出多个请求,将 keepAlive 设置为 true 也可能对您有所帮助(有关更多信息,请参阅上面的文档)。

【讨论】:

仍然,立即创建数千个闭包并池化套接字似乎效率不高?【参考方案6】:

bluebird 的Promise.map 可以采用并发选项来控制应并行运行的承诺数量。有时它比.all 更容易,因为您不需要创建promise 数组。

const Promise = require('bluebird')

function getCounts() 
  return Promise.map(users, user => 
    return new Promise(resolve => 
      remoteServer.getCount(user) // makes an HTTP request
      .then(() => 
        /* snip */
        resolve();
       );
    );
  , concurrency: 10); // <---- at most 10 http requests at a time

【讨论】:

如果你需要更快的承诺,bluebird 会很高兴,如果你只将它用于一件事,那么它会额外增加约 18kb 的垃圾;) 一切都取决于一件事对你来说有多重要,以及是否有其他更快/更好的方法。一个典型的权衡。我会选择易于使用和功能超过几 kb,但 YMMV。【参考方案7】:

我建议库异步池:https://github.com/rxaviers/async-pool

npm install tiny-async-pool

说明:

使用原生 ES6/ES7 以有限的并发性运行多个 promise-returning 和异步函数

asyncPool 在有限的并发池中运行多个 promise-returning & async 函数。一旦其中一个承诺被拒绝,它就会立即拒绝。它在所有承诺完成时解决。它尽快(在并发限制下)调用迭代器函数。

用法:

const timeout = i => new Promise(resolve => setTimeout(() => resolve(i), i));
await asyncPool(2, [1000, 5000, 3000, 2000], timeout);
// Call iterator (i = 1000)
// Call iterator (i = 5000)
// Pool limit of 2 reached, wait for the quicker one to complete...
// 1000 finishes
// Call iterator (i = 3000)
// Pool limit of 2 reached, wait for the quicker one to complete...
// 3000 finishes
// Call iterator (i = 2000)
// Itaration is complete, wait until running ones complete...
// 5000 finishes
// 2000 finishes
// Resolves, results are passed in given array order `[1000, 5000, 3000, 2000]`.

【讨论】:

为我工作。谢谢。这是一个很棒的图书馆。 我需要用Promise.allSettled() 处理拒绝,所以这不起作用【参考方案8】:

这是我的 ES7 解决方案,可用于复制粘贴友好且功能完整的 Promise.all()/map() 替代方案,具有并发限制。

类似于Promise.all(),它维护返回顺序以及非承诺返回值的后备。

我还对不同的实现进行了比较,因为它说明了一些其他解决方案遗漏的一些方面。

用法

const asyncFn = delay => new Promise(resolve => setTimeout(() => resolve(), delay));
const args = [30, 20, 15, 10];
await asyncPool(args, arg => asyncFn(arg), 4); // concurrency limit of 4

实施

async function asyncBatch(args, fn, limit = 8) 
  // Copy arguments to avoid side effects
  args = [...args];
  const outs = [];
  while (args.length) 
    const batch = args.splice(0, limit);
    const out = await Promise.all(batch.map(fn));
    outs.push(...out);
  
  return outs;


async function asyncPool(args, fn, limit = 8) 
  return new Promise((resolve) => 
    // Copy arguments to avoid side effect, reverse queue as
    // pop is faster than shift
    const argQueue = [...args].reverse();
    let count = 0;
    const outs = [];
    const pollNext = () => 
      if (argQueue.length === 0 && count === 0) 
        resolve(outs);
       else 
        while (count < limit && argQueue.length) 
          const index = args.length - argQueue.length;
          const arg = argQueue.pop();
          count += 1;
          const out = fn(arg);
          const processOut = (out, index) => 
            outs[index] = out;
            count -= 1;
            pollNext();
          ;
          if (typeof out === 'object' && out.then) 
            out.then(out => processOut(out, index));
           else 
            processOut(out, index);
          
        
      
    ;
    pollNext();
  );

比较

// A simple async function that returns after the given delay
// and prints its value to allow us to determine the response order
const asyncFn = delay => new Promise(resolve => setTimeout(() => 
  console.log(delay);
  resolve(delay);
, delay));

// List of arguments to the asyncFn function
const args = [30, 20, 15, 10];

// As a comparison of the different implementations, a low concurrency
// limit of 2 is used in order to highlight the performance differences.
// If a limit greater than or equal to args.length is used the results
// would be identical.

// Vanilla Promise.all/map combo
const out1 = await Promise.all(args.map(arg => asyncFn(arg)));
// prints: 10, 15, 20, 30
// total time: 30ms

// Pooled implementation
const out2 = await asyncPool(args, arg => asyncFn(arg), 2);
// prints: 20, 30, 15, 10
// total time: 40ms

// Batched implementation
const out3 = await asyncBatch(args, arg => asyncFn(arg), 2);
// prints: 20, 30, 20, 30
// total time: 45ms

console.log(out1, out2, out3); // prints: [30, 20, 15, 10] x 3

// Conclusion: Execution order and performance is different,
// but return order is still identical

结论

asyncPool() 应该是最好的解决方案,因为它允许在前一个请求完成后立即启动新请求。

asyncBatch() 包含在比较中,因为它的实现更易于理解,但性能应该更慢,因为需要完成同一批次中的所有请求才能开始下一批。

在这个人为设计的例子中,无限制的 vanilla Promise.all() 当然是最快的,而其他的在现实世界的拥塞情况下可能表现得更理想。

更新

其他人已经建议的异步池库可能是我实现的更好替代方案,因为它的工作原理几乎相同,并且通过巧妙地使用 Promise.race() 实现更简洁:https://github.com/rxaviers/async-pool/blob/master/lib/es7.js

希望我的回答仍然具有教育价值。

【讨论】:

【参考方案9】:

Semaphore 是众所周知的并发原语,旨在解决类似问题。它是非常通用的构造,信号量的实现存在于多种语言中。这就是使用 Semaphore 解决此问题的方式:

async function main() 
  const s = new Semaphore(100);
  const res = await Promise.all(
    entities.map((users) => 
      s.runExclusive(() => remoteServer.getCount(user))
    )
  );
  return res;

我正在使用来自 async-mutex 的 Semaphore 实现,它有不错的文档和 TypeScript 支持。

如果您想深入了解此类主题,可以查看“信号量小书”一书,该书可通过PDF here免费获得

【讨论】:

【参考方案10】:

这是流式传输和“p-limit”的基本示例。它将 http 读取流传输到 mongo db。

const stream = require('stream');
const util = require('util');
const pLimit = require('p-limit');
const es = require('event-stream');
const streamToMongoDB = require('stream-to-mongo-db').streamToMongoDB;


const pipeline = util.promisify(stream.pipeline)

const outputDBConfig = 
    dbURL: 'yr-db-url',
    collection: 'some-collection'
;
const limit = pLimit(3);

async yrAsyncStreamingFunction(readStream) => 
        const mongoWriteStream = streamToMongoDB(outputDBConfig);
        const mapperStream = es.map((data, done) => 
                let someDataPromise = limit(() => yr_async_call_to_somewhere())

                    someDataPromise.then(
                        function handleResolve(someData) 

                            data.someData = someData;    
                            done(null, data);
                        ,
                        function handleError(error) 
                            done(error)
                        
                    );
                )

            await pipeline(
                readStream,
                JSONStream.parse('*'),
                mapperStream,
                mongoWriteStream
            );
        

【讨论】:

【参考方案11】:

可以用递归来解决。

这个想法是,最初您发送最大允许数量的请求,并且这些请求中的每一个都应该在完成时递归地继续发送自己。

function batchFetch(urls, concurrentRequestsLimit) 
    return new Promise(resolve => 
        var documents = [];
        var index = 0;

        function recursiveFetch() 
            if (index === urls.length) 
                return;
            
            fetch(urls[index++]).then(r => 
                documents.push(r.text());
                if (documents.length === urls.length) 
                    resolve(documents);
                 else 
                    recursiveFetch();
                
            );
        

        for (var i = 0; i < concurrentRequestsLimit; i++) 
            recursiveFetch();
        
    );


var sources = [
    'http://www.example_1.com/',
    'http://www.example_2.com/',
    'http://www.example_3.com/',
    ...
    'http://www.example_100.com/'
];
batchFetch(sources, 5).then(documents => 
   console.log(documents);
);

【讨论】:

这种方法完全忽略了错误处理。【参考方案12】: @tcooc 的回答很酷。不知道,将来会利用它。 我也很喜欢 @MatthewRideout 的回答,但它使用了外部库!!

只要有可能,我都会尝试自己开发这类东西,而不是去图书馆。你最终会学到很多以前看起来令人生畏的概念。

 class Pool
        constructor(maxAsync) 
            this.maxAsync = maxAsync;
            this.asyncOperationsQueue = [];
            this.currentAsyncOperations = 0
        

        runAnother() 
            if (this.asyncOperationsQueue.length > 0 && this.currentAsyncOperations < this.maxAsync) 
                this.currentAsyncOperations += 1;
                this.asyncOperationsQueue.pop()()
                    .then(() =>  this.currentAsyncOperations -= 1; this.runAnother() , () =>  this.currentAsyncOperations -= 1; this.runAnother() )
            
        

        add(f)  // the argument f is a function of signature () => Promise
            this.runAnother();
            return new Promise((resolve, reject) => 
                this.asyncOperationsQueue.push(
                    () => f().then(resolve).catch(reject)
                )
            )
        
    

//#######################################################
//                        TESTS
//#######################################################

function dbCall(id, timeout, fail) 
    return new Promise((resolve, reject) => 
        setTimeout(() => 
            if (fail) 
               reject(`Error for id $id`);
             else 
                resolve(id);
            
        , timeout)
    
    )



const dbQuery1 = () => dbCall(1, 5000, false);
const dbQuery2 = () => dbCall(2, 5000, false);
const dbQuery3 = () => dbCall(3, 5000, false);
const dbQuery4 = () => dbCall(4, 5000, true);
const dbQuery5 = () => dbCall(5, 5000, false);


const cappedPool = new Pool(2);

const dbQuery1Res = cappedPool.add(dbQuery1).catch(i => i).then(i => console.log(`Resolved: $i`))
const dbQuery2Res = cappedPool.add(dbQuery2).catch(i => i).then(i => console.log(`Resolved: $i`))
const dbQuery3Res = cappedPool.add(dbQuery3).catch(i => i).then(i => console.log(`Resolved: $i`))
const dbQuery4Res = cappedPool.add(dbQuery4).catch(i => i).then(i => console.log(`Resolved: $i`))
const dbQuery5Res = cappedPool.add(dbQuery5).catch(i => i).then(i => console.log(`Resolved: $i`))

这种方法提供了一个不错的 API,类似于 scala/java 中的线程池。 在使用 const cappedPool = new Pool(2) 创建池的一个实例后,您只需使用 cappedPool.add(() =&gt; myPromise) 向它提供承诺。 不知不觉我们必须确保 promise 不会立即启动,这就是为什么我们必须借助函数“延迟提供”。

最重要的是,请注意 add 方法的结果是一个 Promise,它将以你原来的 Promise 的值完成/解决!这使得使用非常直观。

const resultPromise = cappedPool.add( () => dbCall(...))
resultPromise
.then( actualResult => 
   // Do something with the result form the DB
  
)

【讨论】:

【参考方案13】:

正如此答案线程中的所有其他人所指出的,如果您需要限制并发性,Promise.all() 不会做正确的事情。但理想情况下,您甚至不应该想要等到所有的 Promise 完成后再处理它们。

相反,您希望在每个结果可用时尽快对其进行处理,因此您不必等待最后一个承诺完成后再开始迭代它们。

所以,这里有一个代码示例,它部分基于the answer by Endless 和this answer by T.J. Crowder。

async function* raceAsyncIterators(iterators) 
    async function queueNext(iteratorResult) 
        delete iteratorResult.result; // Release previous result ASAP
        iteratorResult.result = await iteratorResult.iterator.next();
        return iteratorResult;
    ;
    const iteratorResults = new Map(iterators.map(iterator => [
        iterator,
        queueNext( iterator )
    ]));
    while (iteratorResults.size) 
        const winner = await Promise.race(iteratorResults.values());
        if (winner.result.done) 
            iteratorResults.delete(winner.iterator);
         else 
            const  value  = winner.result;
            iteratorResults.set(winner.iterator, queueNext(winner));
            yield value;
        
    


async function* runTasks(maxConcurrency, iterator) 
    // Each worker is an async generator that polls for tasks
    // from the shared iterator.
    // Sharing the iterator ensures that each worker gets unique tasks.
    const workers = new Array(maxConcurrency);
    for (let i = 0; i < maxConcurrency; i++) 
        workers[i] = (async function*() 
            for (const task of iterator) yield await task();
        )();
    

    yield* raceAsyncIterators(workers);


// example tasks that sleep and return a number
function sleep(ms)  return new Promise(r => setTimeout(r, ms)); 

const tasks = [];
for (let i = 0; i < 20; i++) 
    tasks.push(async () => 
        console.log(`start $i`);
        await sleep(Math.random() * 1000);
        console.log(`end $i`);
        return i;
    );


(async () => 
    for await (let value of runTasks(3, tasks.values())) 
        console.log(`output $value`);
    
)()

这里有很多魔法;让我解释一下。

每个“worker”都被声明为来自共享迭代器的async generator(async function*())轮询。

// Each worker is an async iterator that polls for tasks
// from the shared iterator.
// Sharing the iterator ensures that each worker gets unique tasks.
const workers = new Array(maxConcurrency);
for (let i = 0; i < maxConcurrency; i++) 
    workers[i] = (async function*() 
        for (const task of iterator) yield await task();
    )();

如果我们只有一个工人,我们可以调用for await (let result of worker()) 来获取结果流。

但是,由于我们有 N 个工作迭代器,我们想要竞速它们,处理最先产生结果的迭代器的结果。这就是raceAsyncIterators() 函数的作用。类似于Promise.race(),但有 N 个 Promises 迭代器,而不仅仅是 N 个 Promises。

raceAsyncIterators() 异步生成器函数保留一个 iteratorResults 映射,从 iterators(作为键)映射到 iteratorResult 对象的 Promise;每个iteratorResult 都有一个iterator 属性和一个result(等待迭代器的next() Promise 的结果)。

raceAsyncIterators() 调用Promise.race()iteratorResult Promise 竞相完成他们的任务。当获胜的iteratorResult 说它的迭代器完全是done 时,我们将其从地图中删除;否则,我们将其在 iteratorResults 映射中的 Promise 替换为迭代器的 next() Promise 和 yield result 值。

使用runTasks(),我们可以使用for await 循环,如下所示:

for await (const value of runTasks(3, tasks.values())) 
    console.log(`output $value`);

这会按照解决顺序返回每个 Promise 的 value

在示例中,我们生成一个包含 20 个异步任务的数组,sleep 持续随机时间并返回一个数字。 (在现实生活中,你可能会创建一个fetch URL 之类的函数数组。)

该示例调用 runTasks 并有 3 个并发工作人员,因此同时启动的任务不超过 3 个。当任何任务完成时,我们立即将下一个任务排队。 (这优于“批处理”,您一次执行 3 个任务,等待所有三个任务,并且在整个前一批完成之前不要开始下一批三个。)

【讨论】:

我设法使用这个函数来运行一个最大项目的承诺池,它的性能非常好,而且与@supercharge/promise-pool 这样的解决方案相比,它的开销很低,这个答案值得更多投票【参考方案14】:

所以我尝试为我的代码制作一些示例,但由于这仅适用于导入脚本而不是生产代码,因此使用 npm 包batch-promises 对我来说无疑是最简单的路径

注意:需要运行时来支持 Promise 或被 polyfill。

API batchPromises(int: batchSize, array: Collection, i => Promise: Iteratee) Promise:Iteratee 将在每批之后被调用。

用途:

batch-promises
Easily batch promises

NOTE: Requires runtime to support Promise or to be polyfilled.

Api
batchPromises(int: batchSize, array: Collection, i => Promise: Iteratee)
The Promise: Iteratee will be called after each batch.

Use:
import batchPromises from 'batch-promises';
 
batchPromises(2, [1,2,3,4,5], i => new Promise((resolve, reject) => 
 
  // The iteratee will fire after each batch resulting in the following behaviour:
  // @ 100ms resolve items 1 and 2 (first batch of 2)
  // @ 200ms resolve items 3 and 4 (second batch of 2)
  // @ 300ms resolve remaining item 5 (last remaining batch)
  setTimeout(() => 
    resolve(i);
  , 100);
))
.then(results => 
  console.log(results); // [1,2,3,4,5]
);

【讨论】:

【参考方案15】:

如果你不想使用外部库,递归就是答案

downloadAll(someArrayWithData)
  var self = this;

  var tracker = function(next)
    return self.someExpensiveRequest(someArrayWithData[next])
    .then(function()
      next++;//This updates the next in the tracker function parameter
      if(next < someArrayWithData.length)//Did I finish processing all my data?
        return tracker(next);//Go to the next promise
      
    );
  

  return tracker(0); 

【讨论】:

【参考方案16】:

很遗憾,原生 Promise.all 无法做到这一点,所以你必须要有创意。

这是我能找到的最快最简洁的方法,无需使用任何外部库。

它利用了一个新的 javascript 特性,称为迭代器。迭代器基本上会跟踪哪些项目已处理,哪些尚未处理。

为了在代码中使用它,您需要创建一个异步函数数组。每个异步函数都向同一个迭代器询问下一个需要处理的项目。每个函数异步处理自己的项目,完成后向迭代器询问一个新项目。一旦迭代器用完项目,所有的功能就完成了。

感谢@Endless 的启发。

var items = [
    "https://www.***.com",
    "https://www.***.com",
    "https://www.***.com",
    "https://www.***.com",
    "https://www.***.com",
    "https://www.***.com",
    "https://www.***.com",
    "https://www.***.com",
];

var concurrency = 5

Array(concurrency).fill(items.entries()).map(async (cursor) => 
    for(let [index, url] of cursor)
        console.log("getting url is ", index, url);
        // run your async task instead of this next line
        var text = await fetch(url).then(res => res.text());
        console.log("text is", text.slice(0,20));
    
)

【讨论】:

很好奇为什么这被标记了。这与我想出的非常相似。【参考方案17】:

这么多好的解决方案。我从@Endless 发布的优雅解决方案开始,最终得到了这个不使用任何外部库也不批量运行的小扩展方法(尽管假设您具有异步等功能):

Promise.allWithLimit = async (taskList, limit = 5) => 
    const iterator = taskList.entries();
    let results = new Array(taskList.length);
    let workerThreads = new Array(limit).fill(0).map(() => 
        new Promise(async (resolve, reject) => 
            try 
                let entry = iterator.next();
                while (!entry.done) 
                    let [index, promise] = entry.value;
                    try 
                        results[index] = await promise;
                        entry = iterator.next();
                    
                    catch (err) 
                        results[index] = err;
                    
                
                // No more work to do
                resolve(true); 
            
            catch (err) 
                // This worker is dead
                reject(err);
            
        ));

    await Promise.all(workerThreads);
    return results;
;

    Promise.allWithLimit = async (taskList, limit = 5) => 
        const iterator = taskList.entries();
        let results = new Array(taskList.length);
        let workerThreads = new Array(limit).fill(0).map(() => 
            new Promise(async (resolve, reject) => 
                try 
                    let entry = iterator.next();
                    while (!entry.done) 
                        let [index, promise] = entry.value;
                        try 
                            results[index] = await promise;
                            entry = iterator.next();
                        
                        catch (err) 
                            results[index] = err;
                        
                    
                    // No more work to do
                    resolve(true); 
                
                catch (err) 
                    // This worker is dead
                    reject(err);
                
            ));
    
        await Promise.all(workerThreads);
        return results;
    ;

    const demoTasks = new Array(10).fill(0).map((v,i) => new Promise(resolve => 
       let n = (i + 1) * 5;
       setTimeout(() => 
          console.log(`Did nothing for $n seconds`);
          resolve(n);
       , n * 1000);
    ));

    var results = Promise.allWithLimit(demoTasks);

【讨论】:

【参考方案18】:

扩展@deceleratedcaviar 发布的答案,我创建了一个“批处理”实用程序函数,该函数以参数为参数:值数组、并发限制和处理函数。是的,我意识到使用 Promise.all 这种方式更类似于批处理与真正的并发性,但如果目标是一次限制过多的 HTTP 调用,我会选择这种方法,因为它简单且不需要外部库.

async function batch(o) 
  let arr = o.arr
  let resp = []
  while (arr.length) 
    let subset = arr.splice(0, o.limit)
    let results = await Promise.all(subset.map(o.process))
    resp.push(results)
  
  return [].concat.apply([], resp)


let arr = []
for (let i = 0; i < 250; i++)  arr.push(i) 

async function calc(val)  return val * 100 

(async () => 
  let resp = await batch(
    arr: arr,
    limit: 100,
    process: calc
  )
  console.log(resp)
)();

【讨论】:

【参考方案19】:

使用自定义承诺库 (CPromise) 的另一种解决方案:

使用生成器Live codesandbox demo
    import  CPromise  from "c-promise2";
    import cpFetch from "cp-fetch";
    
    const promise = CPromise.all(
      function* () 
        const urls = [
          "https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=1",
          "https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=2",
          "https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=3",
          "https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=4",
          "https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=5",
          "https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=6",
          "https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=7"
        ];
    
        for (const url of urls) 
          yield cpFetch(url); // add a promise to the pool
          console.log(`Request [$url] completed`);
        
      ,
       concurrency: 2 
    ).then(
      (v) => console.log(`Done: `, v),
      (e) => console.warn(`Failed: $e`)
    );
    
    // yeah, we able to cancel the task and abort pending network requests
    // setTimeout(() => promise.cancel(), 4500);
使用映射器Live codesandbox demo

    import  CPromise  from "c-promise2";
    import cpFetch from "cp-fetch";
    
    const promise = CPromise.all(
      [
        "https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=1",
        "https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=2",
        "https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=3",
        "https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=4",
        "https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=5",
        "https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=6",
        "https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=7"
      ],
      
        mapper: (url) => 
          console.log(`Request [$url]`);
          return cpFetch(url);
        ,
        concurrency: 2
      
    ).then(
      (v) => console.log(`Done: `, v),
      (e) => console.warn(`Failed: $e`)
    );
    
    // yeah, we able to cancel the task and abort pending network requests
    //setTimeout(() => promise.cancel(), 4500);

【讨论】:

【参考方案20】:

警告这尚未针对效率进行基准测试,并且会进行大量数组复制/创建

如果您想要更实用的方法,您可以执行以下操作:

import chunk from 'lodash.chunk';

const maxConcurrency = (max) => (dataArr, promiseFn) =>
  chunk(dataArr, max).reduce(
      async (agg, batch) => [
          ...(await agg),
          ...(await Promise.all(batch.map(promiseFn)))
      ],
      []
  );

然后你可以像这样使用它:

const randomFn = (data) =>
    new Promise((res) => setTimeout(
      () => res(data + 1),
        Math.random() * 1000
      ));


const result = await maxConcurrency(5)(
    [1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
    randomFn
);
console.log('result+++', result);

【讨论】:

【参考方案21】:

我一直在使用 bottleneck 库,我真的很喜欢它,但在我的情况下,它并没有释放内存,而是一直在处理长时间运行的作业……这对于运行你可能的大量作业来说并不好首先需要一个节流/并发库。

我需要一个简单、开销低、易于维护的解决方案。我还想要让池保持充值的东西,而不是简单地批处理预定义的块......在下载器的情况下,这将阻止 nGB 文件阻止你的队列几分钟/小时一次,即使其余的批次在很久以前就完成了。

这是我一直在使用的 Node.js v16+、无依赖、异步生成器解决方案:

const promiseState = function( promise ) 
  // A promise could never resolve to a unique symbol unless it was in this scope
  const control = Symbol();

  // This helps us determine the state of the promise... A little heavy, but it beats a third-party promise library. The control is the second element passed to Promise.race() since it will only resolve first if the promise being tested is pending.
  return Promise
    .race([ promise, control ])
    .then( value => ( value === control ) ? 'pending' : 'fulfilled' )
    .catch( () => 'rejected' );


const throttle = async function* ( reservoir, promiseFunction, highWaterMark ) 
  let iterable = reservoir.splice( 0, highWaterMark ).map( item => promiseFunction( item ) );

  while ( iterable.length > 0 ) 
    // When a promise has resolved we have space to top it up to the high water mark...
    await Promise.any( iterable );

    const pending = [];
    const resolved = [];

    // This identifies the promise(s) that have resolved so that we can yield them
    for ( const currentValue of iterable ) 
      if ( await promiseState( currentValue ) === 'pending' ) 
        pending.push( currentValue );
       else 
        resolved.push( currentValue );
      
    

    // Put the remaining promises back into iterable, and top it to the high water mark
    iterable = [
      ...pending,
      ...reservoir.splice( 0, highWaterMark - pending.length ).map( value => promiseFunction( value ) )
    ];

    yield Promise.allSettled( resolved );
  


// This is just an example of what would get passed as "promiseFunction"... This can be the function that returns your HTTP request promises
const getTimeout = delay => new Promise( (resolve, reject) => setTimeout(resolve, delay, delay) );

// This is just the async IIFE that bootstraps this example
( async () => 

  const test = [ 1000, 2000, 3000, 4000, 5000, 6000, 1500, 2500, 3500, 4500, 5500, 6500 ];

  for await ( const timeout of throttle( test, getTimeout, 4 ) ) 
    console.log( timeout );
  

 )();

【讨论】:

【参考方案22】:

我有创建块并使用 .reduce 函数等待每个块 promise.alls 完成的解决方案。如果承诺有一些调用限制,我也会添加一些延迟。

export function delay(ms: number) 
  return new Promise<void>((resolve) => setTimeout(resolve, ms));


export const chunk = <T>(arr: T[], size: number): T[][] => [
  ...Array(Math.ceil(arr.length / size)),
].map((_, i) => arr.slice(size * i, size + size * i));

const myIdlist = []; // all items
const groupedIdList = chunk(myIdList, 20); // grouped by 20 items

await groupedIdList.reduce(async (prev, subIdList) => 
  await prev;
  // Make sure we wait for 500 ms after processing every page to prevent overloading the calls.
  const data = await Promise.all(subIdList.map(myPromise));
  await delay(500);
, Promise.resolve());

【讨论】:

【参考方案23】:

此解决方案使用async generator 来管理带有香草 JavaScript 的并发承诺。 throttle 生成器接受 3 个参数:

要作为参数提供给 Promise 生成函数的值数组。 (例如,一组 URL。) 返回承诺的函数。 (例如,返回一个 HTTP 请求的承诺。) 一个整数,表示允许的最大并发承诺。

Promise 仅在需要时实例化,以减少内存消耗。结果可以使用for await...of 语句进行迭代。

下面的示例提供了一个检查 Promise 状态的函数、节流异步生成器,以及一个基于 setTimeout 返回 Promise 的简单函数。最后的async IIFE 定义了超时值的存储库,设置throttle 返回的async iterable,然后在结果解析时迭代它们。

如果您想要更完整的 HTTP 请求示例,请在 cmets 中告诉我。

请注意,Node.js 16+ 是必需的才能使用异步生成器。

const promiseState = function( promise ) 
  const control = Symbol();

  return Promise
    .race([ promise, control ])
    .then( value => ( value === control ) ? 'pending' : 'fulfilled' )
    .catch( () => 'rejected' );


const throttle = async function* ( reservoir, promiseClass, highWaterMark ) 
  let iterable = reservoir.splice( 0, highWaterMark ).map( item => promiseClass( item ) );

  while ( iterable.length > 0 ) 
    await Promise.any( iterable );

    const pending = [];
    const resolved = [];

    for ( const currentValue of iterable ) 
      if ( await promiseState( currentValue ) === 'pending' ) 
        pending.push( currentValue );
       else 
        resolved.push( currentValue );
      
    

    console.log( pending, resolved, reservoir );

    iterable = [
      ...pending,
      ...reservoir.splice( 0, highWaterMark - pending.length ).map( value => promiseClass( value ) )
    ];

    yield Promise.allSettled( resolved );
  


const getTimeout = delay => new Promise( ( resolve, reject ) => 
  setTimeout(resolve, delay, delay);
 );

( async () => 
  const test = [ 1100, 1200, 1300, 10000, 11000, 9000, 5000, 6000, 3000, 4000, 1000, 2000, 3500 ];

  const throttledRequests = throttle( test, getTimeout, 4 );

  for await ( const timeout of throttledRequests ) 
    console.log( timeout );
  
 )();

【讨论】:

【参考方案24】:

控制最大承诺/请求数量的一个很好的解决方案是将您的请求列表拆分为页面,并且一次只生成一个页面的请求。

下面的例子使用了iter-ops 库:

import pipe, toAsync, map, page from 'iter-ops';

const i = pipe(
    toAsync(users), // make it asynchronous
    page(10), // split into pages of 10 items in each
    map(p => Promise.all(p.map(u => u.remoteServer.getCount(u)))), // map into requests
    wait() // resolve each page in the pipeline
);

// below triggers processing page-by-page:

for await(const p of i) 
    //=> p = resolved page of data

这样它就不会尝试创建超过一页大小的请求/承诺。

【讨论】:

【参考方案25】:

这就是我使用 Promise.race 所做的,在我的代码中

const identifyTransactions = async function() 
  let promises = []
  let concurrency = 0
  for (let tx of this.transactions) 
    if (concurrency > 4)
      await Promise.race(promises).then(r =>  promises = []; concurrency = 0 )
    promises.push(tx.identifyTransaction())
    concurrency++
  
  if (promises.length > 0)
    await Promise.race(promises) //resolve the rest

如果你想看一个例子:https://jsfiddle.net/thecodermarcelo/av2tp83o/5/

【讨论】:

【参考方案26】:

如果您的目标是减慢 Promise.all 的速度以避免速率限制或过载:

这是我的实现

async function promiseAllGentle(arr, batchSize = 5, sleep = 50) 
  let output = [];
  while (arr.length) 
    const batchResult = await Promise.all(arr.splice(0, batchSize));
    output = [...output, ...batchResult];
    await new Promise((res) => setTimeout(res, sleep));
  
  return output;

【讨论】:

这不会做任何事情,只是在一段时间内错过处理拒绝,导致崩溃。 Promise.all 不能放慢速度,它只是等待而不运行任何任务。 你能解释一下为什么它不会返回任何任务,也不会处理拒绝吗? promiseAllGentle(Array.from(length: 50, (_, i) =&gt; console.log(i+' happens right now'); return i == 20 ? Promise.reject("This will be handled too late") : new Promise(resolve =&gt; setTimeout(resolve, 50)); )) 谢谢,我正在考虑改用 p-limit 或 p-throttle。为什么要重新发明***;)

以上是关于使用 ES6 的 Promise.all() 时限制并发的最佳方法是啥?的主要内容,如果未能解决你的问题,请参考以下文章

ES6 Promise.all() 错误句柄 - 是不是需要 .settle()? [复制]

ES6(十三)Promise

es6 promise的使用

es6中promise ALL Race Resolve Reject finish的实现

利用ES6的Promise.all实现至少请求多长时间

ES6 Promise Javascript