使用 ES6 的 Promise.all() 时限制并发的最佳方法是啥?
Posted
技术标签:
【中文标题】使用 ES6 的 Promise.all() 时限制并发的最佳方法是啥?【英文标题】:What is the best way to limit concurrency when using ES6's Promise.all()?使用 ES6 的 Promise.all() 时限制并发的最佳方法是什么? 【发布时间】:2017-03-31 01:58:31 【问题描述】:我有一些代码正在遍历从数据库中查询出来的列表,并为该列表中的每个元素发出 HTTP 请求。该列表有时可能是一个相当大的数字(以数千计),我想确保我不会访问具有数千个并发 HTTP 请求的 Web 服务器。
此代码的缩写版本目前看起来像这样......
function getCounts()
return users.map(user =>
return new Promise(resolve =>
remoteServer.getCount(user) // makes an HTTP request
.then(() =>
/* snip */
resolve();
);
);
);
Promise.all(getCounts()).then(() => /* snip */);
此代码在节点 4.3.2 上运行。重申一下,是否可以管理 Promise.all
,以便在任何给定时间只有一定数量的 Promise 正在进行?
【问题讨论】:
Limit concurrency of promise being run 的可能重复项 别忘了Promise.all
does manage promise progression - 承诺会自己做,Promise.all
只是在等待它们。
避免Promise
constructor antipattern!
gist.github.com/alexpsi/…
【参考方案1】:
P-限制
我将 promise 并发限制与自定义脚本、bluebird、es6-promise-pool 和 p-limit 进行了比较。我相信p-limit 有最简单、最精简的实现来满足这种需求。 See their documentation.
要求
为了兼容示例中的异步
ECMAScript 2017 (version 8) 节点版本>8.2.1我的例子
在这个例子中,我们需要为数组中的每个 URL 运行一个函数(比如,可能是一个 API 请求)。这里称为fetchData()
。如果我们有一个包含数千个项目的数组要处理,并发对于节省 CPU 和内存资源肯定是有用的。
const pLimit = require('p-limit');
// Example Concurrency of 3 promise at once
const limit = pLimit(3);
let urls = [
"http://www.exampleone.com/",
"http://www.exampletwo.com/",
"http://www.examplethree.com/",
"http://www.examplefour.com/",
]
// Create an array of our promises using map (fetchData() returns a promise)
let promises = urls.map(url =>
// wrap the function we are calling in the limit function we defined above
return limit(() => fetchData(url));
);
(async () =>
// Only three promises are run at once (as defined above)
const result = await Promise.all(promises);
console.log(result);
)();
控制台日志结果是您已解决的承诺响应数据的数组。
【讨论】:
感谢这个!这个简单多了 这是迄今为止我见过的用于限制同时请求的最佳库。还有很好的例子,谢谢! 感谢您进行比较。你和github.com/rxaviers/async-pool比较过吗? 易于使用,不错的选择。 除了这个之外,每周在 npm 上的下载量约为 2000 万,而其他答案中提到的其他库的下载量约为 200-100k。【参考方案2】:请注意,Promise.all()
不会触发 Promise 开始工作,而创建 Promise 本身会触发。
考虑到这一点,一种解决方案是在解决承诺时检查是否应该启动新的承诺,或者您是否已经达到限制。
但是,这里真的没有必要重新发明***。 One library that you could use for this purpose is es6-promise-pool
。从他们的例子中:
// On the Web, leave out this line and use the script tag above instead.
var PromisePool = require('es6-promise-pool')
var promiseProducer = function ()
// Your code goes here.
// If there is work left to be done, return the next work item as a promise.
// Otherwise, return null to indicate that all promises have been created.
// Scroll down for an example.
// The number of promises to process simultaneously.
var concurrency = 3
// Create a pool.
var pool = new PromisePool(promiseProducer, concurrency)
// Start the pool.
var poolPromise = pool.start()
// Wait for the pool to settle.
poolPromise.then(function ()
console.log('All promises fulfilled')
, function (error)
console.log('Some promise rejected: ' + error.message)
)
【讨论】:
不幸的是 es6-promise-pool 重新发明了 Promise 而不是使用它们。我建议使用这个简洁的解决方案(如果您已经使用 ES6 或 ES7)github.com/rxaviers/async-pool 看了一下,异步池看起来好多了!更直接、更轻便。 我还发现 p-limit 是最简单的实现。请参阅下面的示例。 ***.com/a/52262024/8177355 我认为 tiny-asyc-pool 在限制 Promise 并发方面要好得多、非侵入性且相当自然的解决方案。 异步池本质上是不同的,它使用一组承诺而不是返回承诺或空值的函数来完成【参考方案3】:使用Array.prototype.splice
while (funcs.length)
// 100 at a time
await Promise.all( funcs.splice(0, 100).map(f => f()) )
【讨论】:
这是一个被低估的解决方案。喜欢简单。 这将批量运行函数而不是池,其中一个函数在另一个函数完成时立即被调用。 喜欢这个解决方案! 在缺乏更多上下文的情况下花了一秒钟的时间来掌握它在做什么,例如它是一个批处理而不是一个池。每次从开头或中间拼接时,您都会重新排序数组。 (浏览器必须重新索引所有项目)理论性能更好的替代方案是从末尾取东西而不是arr.splice(-100)
如果顺序不正确,也许你可以反转数组:P
对于批量运行非常有用。注意:在当前批次 100% 完成之前,下一批不会开始。【参考方案4】:
如果您知道迭代器是如何工作的以及它们是如何被使用的,那么您就不需要任何额外的库,因为您自己构建自己的并发会变得非常容易。让我演示一下:
/* [Symbol.iterator]() is equivalent to .values()
const iterator = [1,2,3][Symbol.iterator]() */
const iterator = [1,2,3].values()
// loop over all items with for..of
for (const x of iterator)
console.log('x:', x)
// notices how this loop continues the same iterator
// and consumes the rest of the iterator, making the
// outer loop not logging any more x's
for (const y of iterator)
console.log('y:', y)
我们可以使用相同的迭代器并在工作人员之间共享。
如果您使用.entries()
而不是.values()
,您将获得一个带有[[index, value]]
的二维数组,我将在下面以 2 的并发性进行演示
const sleep = t => new Promise(rs => setTimeout(rs, t))
async function doWork(iterator)
for (let [index, item] of iterator)
await sleep(1000)
console.log(index + ': ' + item)
const iterator = Array.from('abcdefghij').entries()
const workers = new Array(2).fill(iterator).map(doWork)
// ^--- starts two workers sharing the same iterator
Promise.allSettled(workers).then(() => console.log('done'))
这样做的好处是您可以拥有generator function,而不是一次准备好一切。
更棒的是,您可以在节点中执行stream.Readable.from(iterator)
(最终也可以在 whatwg 流中)。并且使用可转移的 ReadbleStream,如果您正在与网络工作者一起工作以进行表演,这使得该功能在该功能中非常有用
注意: 与示例 async-pool 相比,它的不同之处在于它产生了两个工作人员,因此如果一个工作人员由于某种原因在索引 5 处抛出错误,它不会停止另一个工作人员工人做剩下的。所以你从 2 个并发减少到 1 个。(所以它不会停在那里)所以我的建议是你在 doWork
函数中捕获所有错误
【讨论】:
这太棒了!谢谢无尽! 这绝对是一个很酷的方法!只要确保你的并发不超过你的任务列表的长度(如果你关心结果的话),因为你最终可能会得到额外的! 当 Streams 获得 Readable.from(iterator) 支持时,可能会更酷一些。 Chrome 已经制作了流 transferable。因此您可以创建可读的流并将其发送给网络工作者,所有这些最终都将使用相同的底层迭代器。 它绝对应该作为 NPM 模块发布,我想使用它。 @KrisOye 当并发超过任务列表的长度时,我不会重现问题。我刚刚尝试用 20 个工人运行代码 sn-p (new Array(20)
);它按要求完成,没有额外内容。 (额外的工人立即完成,因为迭代器在他们启动之前就完成了。)【参考方案5】:
不要使用 promise 来限制 http 请求,而是使用 node 的内置 http.Agent.maxSockets。这消除了使用库或编写自己的池代码的要求,并具有额外的优势,可以更好地控制您的限制。
agent.maxSockets
默认设置为无穷大。确定代理可以为每个源打开多少个并发套接字。 Origin 是 'host:port' 或 'host:port:localAddress' 组合。
例如:
var http = require('http');
var agent = new http.Agent(maxSockets: 5); // 5 concurrent connections per origin
var request = http.request(..., agent: agent, ...);
如果向同一来源发出多个请求,将 keepAlive
设置为 true 也可能对您有所帮助(有关更多信息,请参阅上面的文档)。
【讨论】:
仍然,立即创建数千个闭包并池化套接字似乎效率不高?【参考方案6】:bluebird 的Promise.map 可以采用并发选项来控制应并行运行的承诺数量。有时它比.all
更容易,因为您不需要创建promise 数组。
const Promise = require('bluebird')
function getCounts()
return Promise.map(users, user =>
return new Promise(resolve =>
remoteServer.getCount(user) // makes an HTTP request
.then(() =>
/* snip */
resolve();
);
);
, concurrency: 10); // <---- at most 10 http requests at a time
【讨论】:
如果你需要更快的承诺,bluebird 会很高兴,如果你只将它用于一件事,那么它会额外增加约 18kb 的垃圾;) 一切都取决于一件事对你来说有多重要,以及是否有其他更快/更好的方法。一个典型的权衡。我会选择易于使用和功能超过几 kb,但 YMMV。【参考方案7】:我建议库异步池:https://github.com/rxaviers/async-pool
npm install tiny-async-pool
说明:
使用原生 ES6/ES7 以有限的并发性运行多个 promise-returning 和异步函数
asyncPool 在有限的并发池中运行多个 promise-returning & async 函数。一旦其中一个承诺被拒绝,它就会立即拒绝。它在所有承诺完成时解决。它尽快(在并发限制下)调用迭代器函数。
用法:
const timeout = i => new Promise(resolve => setTimeout(() => resolve(i), i));
await asyncPool(2, [1000, 5000, 3000, 2000], timeout);
// Call iterator (i = 1000)
// Call iterator (i = 5000)
// Pool limit of 2 reached, wait for the quicker one to complete...
// 1000 finishes
// Call iterator (i = 3000)
// Pool limit of 2 reached, wait for the quicker one to complete...
// 3000 finishes
// Call iterator (i = 2000)
// Itaration is complete, wait until running ones complete...
// 5000 finishes
// 2000 finishes
// Resolves, results are passed in given array order `[1000, 5000, 3000, 2000]`.
【讨论】:
为我工作。谢谢。这是一个很棒的图书馆。 我需要用Promise.allSettled()
处理拒绝,所以这不起作用【参考方案8】:
这是我的 ES7 解决方案,可用于复制粘贴友好且功能完整的 Promise.all()
/map()
替代方案,具有并发限制。
类似于Promise.all()
,它维护返回顺序以及非承诺返回值的后备。
我还对不同的实现进行了比较,因为它说明了一些其他解决方案遗漏的一些方面。
用法
const asyncFn = delay => new Promise(resolve => setTimeout(() => resolve(), delay));
const args = [30, 20, 15, 10];
await asyncPool(args, arg => asyncFn(arg), 4); // concurrency limit of 4
实施
async function asyncBatch(args, fn, limit = 8)
// Copy arguments to avoid side effects
args = [...args];
const outs = [];
while (args.length)
const batch = args.splice(0, limit);
const out = await Promise.all(batch.map(fn));
outs.push(...out);
return outs;
async function asyncPool(args, fn, limit = 8)
return new Promise((resolve) =>
// Copy arguments to avoid side effect, reverse queue as
// pop is faster than shift
const argQueue = [...args].reverse();
let count = 0;
const outs = [];
const pollNext = () =>
if (argQueue.length === 0 && count === 0)
resolve(outs);
else
while (count < limit && argQueue.length)
const index = args.length - argQueue.length;
const arg = argQueue.pop();
count += 1;
const out = fn(arg);
const processOut = (out, index) =>
outs[index] = out;
count -= 1;
pollNext();
;
if (typeof out === 'object' && out.then)
out.then(out => processOut(out, index));
else
processOut(out, index);
;
pollNext();
);
比较
// A simple async function that returns after the given delay
// and prints its value to allow us to determine the response order
const asyncFn = delay => new Promise(resolve => setTimeout(() =>
console.log(delay);
resolve(delay);
, delay));
// List of arguments to the asyncFn function
const args = [30, 20, 15, 10];
// As a comparison of the different implementations, a low concurrency
// limit of 2 is used in order to highlight the performance differences.
// If a limit greater than or equal to args.length is used the results
// would be identical.
// Vanilla Promise.all/map combo
const out1 = await Promise.all(args.map(arg => asyncFn(arg)));
// prints: 10, 15, 20, 30
// total time: 30ms
// Pooled implementation
const out2 = await asyncPool(args, arg => asyncFn(arg), 2);
// prints: 20, 30, 15, 10
// total time: 40ms
// Batched implementation
const out3 = await asyncBatch(args, arg => asyncFn(arg), 2);
// prints: 20, 30, 20, 30
// total time: 45ms
console.log(out1, out2, out3); // prints: [30, 20, 15, 10] x 3
// Conclusion: Execution order and performance is different,
// but return order is still identical
结论
asyncPool()
应该是最好的解决方案,因为它允许在前一个请求完成后立即启动新请求。
asyncBatch()
包含在比较中,因为它的实现更易于理解,但性能应该更慢,因为需要完成同一批次中的所有请求才能开始下一批。
在这个人为设计的例子中,无限制的 vanilla Promise.all()
当然是最快的,而其他的在现实世界的拥塞情况下可能表现得更理想。
更新
其他人已经建议的异步池库可能是我实现的更好替代方案,因为它的工作原理几乎相同,并且通过巧妙地使用 Promise.race() 实现更简洁:https://github.com/rxaviers/async-pool/blob/master/lib/es7.js
希望我的回答仍然具有教育价值。
【讨论】:
【参考方案9】:Semaphore 是众所周知的并发原语,旨在解决类似问题。它是非常通用的构造,信号量的实现存在于多种语言中。这就是使用 Semaphore 解决此问题的方式:
async function main()
const s = new Semaphore(100);
const res = await Promise.all(
entities.map((users) =>
s.runExclusive(() => remoteServer.getCount(user))
)
);
return res;
我正在使用来自 async-mutex 的 Semaphore 实现,它有不错的文档和 TypeScript 支持。
如果您想深入了解此类主题,可以查看“信号量小书”一书,该书可通过PDF here免费获得
【讨论】:
【参考方案10】:这是流式传输和“p-limit”的基本示例。它将 http 读取流传输到 mongo db。
const stream = require('stream');
const util = require('util');
const pLimit = require('p-limit');
const es = require('event-stream');
const streamToMongoDB = require('stream-to-mongo-db').streamToMongoDB;
const pipeline = util.promisify(stream.pipeline)
const outputDBConfig =
dbURL: 'yr-db-url',
collection: 'some-collection'
;
const limit = pLimit(3);
async yrAsyncStreamingFunction(readStream) =>
const mongoWriteStream = streamToMongoDB(outputDBConfig);
const mapperStream = es.map((data, done) =>
let someDataPromise = limit(() => yr_async_call_to_somewhere())
someDataPromise.then(
function handleResolve(someData)
data.someData = someData;
done(null, data);
,
function handleError(error)
done(error)
);
)
await pipeline(
readStream,
JSONStream.parse('*'),
mapperStream,
mongoWriteStream
);
【讨论】:
【参考方案11】:可以用递归来解决。
这个想法是,最初您发送最大允许数量的请求,并且这些请求中的每一个都应该在完成时递归地继续发送自己。
function batchFetch(urls, concurrentRequestsLimit)
return new Promise(resolve =>
var documents = [];
var index = 0;
function recursiveFetch()
if (index === urls.length)
return;
fetch(urls[index++]).then(r =>
documents.push(r.text());
if (documents.length === urls.length)
resolve(documents);
else
recursiveFetch();
);
for (var i = 0; i < concurrentRequestsLimit; i++)
recursiveFetch();
);
var sources = [
'http://www.example_1.com/',
'http://www.example_2.com/',
'http://www.example_3.com/',
...
'http://www.example_100.com/'
];
batchFetch(sources, 5).then(documents =>
console.log(documents);
);
【讨论】:
这种方法完全忽略了错误处理。【参考方案12】: @tcooc 的回答很酷。不知道,将来会利用它。 我也很喜欢 @MatthewRideout 的回答,但它使用了外部库!!只要有可能,我都会尝试自己开发这类东西,而不是去图书馆。你最终会学到很多以前看起来令人生畏的概念。
class Pool
constructor(maxAsync)
this.maxAsync = maxAsync;
this.asyncOperationsQueue = [];
this.currentAsyncOperations = 0
runAnother()
if (this.asyncOperationsQueue.length > 0 && this.currentAsyncOperations < this.maxAsync)
this.currentAsyncOperations += 1;
this.asyncOperationsQueue.pop()()
.then(() => this.currentAsyncOperations -= 1; this.runAnother() , () => this.currentAsyncOperations -= 1; this.runAnother() )
add(f) // the argument f is a function of signature () => Promise
this.runAnother();
return new Promise((resolve, reject) =>
this.asyncOperationsQueue.push(
() => f().then(resolve).catch(reject)
)
)
//#######################################################
// TESTS
//#######################################################
function dbCall(id, timeout, fail)
return new Promise((resolve, reject) =>
setTimeout(() =>
if (fail)
reject(`Error for id $id`);
else
resolve(id);
, timeout)
)
const dbQuery1 = () => dbCall(1, 5000, false);
const dbQuery2 = () => dbCall(2, 5000, false);
const dbQuery3 = () => dbCall(3, 5000, false);
const dbQuery4 = () => dbCall(4, 5000, true);
const dbQuery5 = () => dbCall(5, 5000, false);
const cappedPool = new Pool(2);
const dbQuery1Res = cappedPool.add(dbQuery1).catch(i => i).then(i => console.log(`Resolved: $i`))
const dbQuery2Res = cappedPool.add(dbQuery2).catch(i => i).then(i => console.log(`Resolved: $i`))
const dbQuery3Res = cappedPool.add(dbQuery3).catch(i => i).then(i => console.log(`Resolved: $i`))
const dbQuery4Res = cappedPool.add(dbQuery4).catch(i => i).then(i => console.log(`Resolved: $i`))
const dbQuery5Res = cappedPool.add(dbQuery5).catch(i => i).then(i => console.log(`Resolved: $i`))
这种方法提供了一个不错的 API,类似于 scala/java 中的线程池。
在使用 const cappedPool = new Pool(2)
创建池的一个实例后,您只需使用 cappedPool.add(() => myPromise)
向它提供承诺。
不知不觉我们必须确保 promise 不会立即启动,这就是为什么我们必须借助函数“延迟提供”。
最重要的是,请注意 add
方法的结果是一个 Promise,它将以你原来的 Promise 的值完成/解决!这使得使用非常直观。
const resultPromise = cappedPool.add( () => dbCall(...))
resultPromise
.then( actualResult =>
// Do something with the result form the DB
)
【讨论】:
【参考方案13】:正如此答案线程中的所有其他人所指出的,如果您需要限制并发性,Promise.all()
不会做正确的事情。但理想情况下,您甚至不应该想要等到所有的 Promise 完成后再处理它们。
相反,您希望在每个结果可用时尽快对其进行处理,因此您不必等待最后一个承诺完成后再开始迭代它们。
所以,这里有一个代码示例,它部分基于the answer by Endless 和this answer by T.J. Crowder。
async function* raceAsyncIterators(iterators)
async function queueNext(iteratorResult)
delete iteratorResult.result; // Release previous result ASAP
iteratorResult.result = await iteratorResult.iterator.next();
return iteratorResult;
;
const iteratorResults = new Map(iterators.map(iterator => [
iterator,
queueNext( iterator )
]));
while (iteratorResults.size)
const winner = await Promise.race(iteratorResults.values());
if (winner.result.done)
iteratorResults.delete(winner.iterator);
else
const value = winner.result;
iteratorResults.set(winner.iterator, queueNext(winner));
yield value;
async function* runTasks(maxConcurrency, iterator)
// Each worker is an async generator that polls for tasks
// from the shared iterator.
// Sharing the iterator ensures that each worker gets unique tasks.
const workers = new Array(maxConcurrency);
for (let i = 0; i < maxConcurrency; i++)
workers[i] = (async function*()
for (const task of iterator) yield await task();
)();
yield* raceAsyncIterators(workers);
// example tasks that sleep and return a number
function sleep(ms) return new Promise(r => setTimeout(r, ms));
const tasks = [];
for (let i = 0; i < 20; i++)
tasks.push(async () =>
console.log(`start $i`);
await sleep(Math.random() * 1000);
console.log(`end $i`);
return i;
);
(async () =>
for await (let value of runTasks(3, tasks.values()))
console.log(`output $value`);
)()
这里有很多魔法;让我解释一下。
每个“worker”都被声明为来自共享迭代器的async generator(async function*()
)轮询。
// Each worker is an async iterator that polls for tasks
// from the shared iterator.
// Sharing the iterator ensures that each worker gets unique tasks.
const workers = new Array(maxConcurrency);
for (let i = 0; i < maxConcurrency; i++)
workers[i] = (async function*()
for (const task of iterator) yield await task();
)();
如果我们只有一个工人,我们可以调用for await (let result of worker())
来获取结果流。
但是,由于我们有 N 个工作迭代器,我们想要竞速它们,处理最先产生结果的迭代器的结果。这就是raceAsyncIterators()
函数的作用。类似于Promise.race()
,但有 N 个 Promises 迭代器,而不仅仅是 N 个 Promises。
raceAsyncIterators()
异步生成器函数保留一个 iteratorResults
映射,从 iterators
(作为键)映射到 iteratorResult
对象的 Promise;每个iteratorResult
都有一个iterator
属性和一个result
(等待迭代器的next()
Promise 的结果)。
raceAsyncIterators()
调用Promise.race()
让iteratorResult
Promise 竞相完成他们的任务。当获胜的iteratorResult
说它的迭代器完全是done
时,我们将其从地图中删除;否则,我们将其在 iteratorResults
映射中的 Promise 替换为迭代器的 next()
Promise 和 yield
result
值。
使用runTasks()
,我们可以使用for await
循环,如下所示:
for await (const value of runTasks(3, tasks.values()))
console.log(`output $value`);
这会按照解决顺序返回每个 Promise 的 value
。
在示例中,我们生成一个包含 20 个异步任务的数组,sleep
持续随机时间并返回一个数字。 (在现实生活中,你可能会创建一个fetch
URL 之类的函数数组。)
该示例调用 runTasks
并有 3 个并发工作人员,因此同时启动的任务不超过 3 个。当任何任务完成时,我们立即将下一个任务排队。 (这优于“批处理”,您一次执行 3 个任务,等待所有三个任务,并且在整个前一批完成之前不要开始下一批三个。)
【讨论】:
我设法使用这个函数来运行一个最大项目的承诺池,它的性能非常好,而且与@supercharge/promise-pool
这样的解决方案相比,它的开销很低,这个答案值得更多投票【参考方案14】:
所以我尝试为我的代码制作一些示例,但由于这仅适用于导入脚本而不是生产代码,因此使用 npm 包batch-promises 对我来说无疑是最简单的路径
注意:需要运行时来支持 Promise 或被 polyfill。
API batchPromises(int: batchSize, array: Collection, i => Promise: Iteratee) Promise:Iteratee 将在每批之后被调用。
用途:
batch-promises
Easily batch promises
NOTE: Requires runtime to support Promise or to be polyfilled.
Api
batchPromises(int: batchSize, array: Collection, i => Promise: Iteratee)
The Promise: Iteratee will be called after each batch.
Use:
import batchPromises from 'batch-promises';
batchPromises(2, [1,2,3,4,5], i => new Promise((resolve, reject) =>
// The iteratee will fire after each batch resulting in the following behaviour:
// @ 100ms resolve items 1 and 2 (first batch of 2)
// @ 200ms resolve items 3 and 4 (second batch of 2)
// @ 300ms resolve remaining item 5 (last remaining batch)
setTimeout(() =>
resolve(i);
, 100);
))
.then(results =>
console.log(results); // [1,2,3,4,5]
);
【讨论】:
【参考方案15】:如果你不想使用外部库,递归就是答案
downloadAll(someArrayWithData)
var self = this;
var tracker = function(next)
return self.someExpensiveRequest(someArrayWithData[next])
.then(function()
next++;//This updates the next in the tracker function parameter
if(next < someArrayWithData.length)//Did I finish processing all my data?
return tracker(next);//Go to the next promise
);
return tracker(0);
【讨论】:
【参考方案16】:很遗憾,原生 Promise.all 无法做到这一点,所以你必须要有创意。
这是我能找到的最快最简洁的方法,无需使用任何外部库。
它利用了一个新的 javascript 特性,称为迭代器。迭代器基本上会跟踪哪些项目已处理,哪些尚未处理。
为了在代码中使用它,您需要创建一个异步函数数组。每个异步函数都向同一个迭代器询问下一个需要处理的项目。每个函数异步处理自己的项目,完成后向迭代器询问一个新项目。一旦迭代器用完项目,所有的功能就完成了。
感谢@Endless 的启发。
var items = [
"https://www.***.com",
"https://www.***.com",
"https://www.***.com",
"https://www.***.com",
"https://www.***.com",
"https://www.***.com",
"https://www.***.com",
"https://www.***.com",
];
var concurrency = 5
Array(concurrency).fill(items.entries()).map(async (cursor) =>
for(let [index, url] of cursor)
console.log("getting url is ", index, url);
// run your async task instead of this next line
var text = await fetch(url).then(res => res.text());
console.log("text is", text.slice(0,20));
)
【讨论】:
很好奇为什么这被标记了。这与我想出的非常相似。【参考方案17】:这么多好的解决方案。我从@Endless 发布的优雅解决方案开始,最终得到了这个不使用任何外部库也不批量运行的小扩展方法(尽管假设您具有异步等功能):
Promise.allWithLimit = async (taskList, limit = 5) =>
const iterator = taskList.entries();
let results = new Array(taskList.length);
let workerThreads = new Array(limit).fill(0).map(() =>
new Promise(async (resolve, reject) =>
try
let entry = iterator.next();
while (!entry.done)
let [index, promise] = entry.value;
try
results[index] = await promise;
entry = iterator.next();
catch (err)
results[index] = err;
// No more work to do
resolve(true);
catch (err)
// This worker is dead
reject(err);
));
await Promise.all(workerThreads);
return results;
;
Promise.allWithLimit = async (taskList, limit = 5) =>
const iterator = taskList.entries();
let results = new Array(taskList.length);
let workerThreads = new Array(limit).fill(0).map(() =>
new Promise(async (resolve, reject) =>
try
let entry = iterator.next();
while (!entry.done)
let [index, promise] = entry.value;
try
results[index] = await promise;
entry = iterator.next();
catch (err)
results[index] = err;
// No more work to do
resolve(true);
catch (err)
// This worker is dead
reject(err);
));
await Promise.all(workerThreads);
return results;
;
const demoTasks = new Array(10).fill(0).map((v,i) => new Promise(resolve =>
let n = (i + 1) * 5;
setTimeout(() =>
console.log(`Did nothing for $n seconds`);
resolve(n);
, n * 1000);
));
var results = Promise.allWithLimit(demoTasks);
【讨论】:
【参考方案18】:扩展@deceleratedcaviar 发布的答案,我创建了一个“批处理”实用程序函数,该函数以参数为参数:值数组、并发限制和处理函数。是的,我意识到使用 Promise.all 这种方式更类似于批处理与真正的并发性,但如果目标是一次限制过多的 HTTP 调用,我会选择这种方法,因为它简单且不需要外部库.
async function batch(o)
let arr = o.arr
let resp = []
while (arr.length)
let subset = arr.splice(0, o.limit)
let results = await Promise.all(subset.map(o.process))
resp.push(results)
return [].concat.apply([], resp)
let arr = []
for (let i = 0; i < 250; i++) arr.push(i)
async function calc(val) return val * 100
(async () =>
let resp = await batch(
arr: arr,
limit: 100,
process: calc
)
console.log(resp)
)();
【讨论】:
【参考方案19】:使用自定义承诺库 (CPromise) 的另一种解决方案:
使用生成器Live codesandbox demo import CPromise from "c-promise2";
import cpFetch from "cp-fetch";
const promise = CPromise.all(
function* ()
const urls = [
"https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=1",
"https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=2",
"https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=3",
"https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=4",
"https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=5",
"https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=6",
"https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=7"
];
for (const url of urls)
yield cpFetch(url); // add a promise to the pool
console.log(`Request [$url] completed`);
,
concurrency: 2
).then(
(v) => console.log(`Done: `, v),
(e) => console.warn(`Failed: $e`)
);
// yeah, we able to cancel the task and abort pending network requests
// setTimeout(() => promise.cancel(), 4500);
使用映射器Live codesandbox demo
import CPromise from "c-promise2";
import cpFetch from "cp-fetch";
const promise = CPromise.all(
[
"https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=1",
"https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=2",
"https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=3",
"https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=4",
"https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=5",
"https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=6",
"https://run.mocky.io/v3/7b038025-fc5f-4564-90eb-4373f0721822?mocky-delay=2s&x=7"
],
mapper: (url) =>
console.log(`Request [$url]`);
return cpFetch(url);
,
concurrency: 2
).then(
(v) => console.log(`Done: `, v),
(e) => console.warn(`Failed: $e`)
);
// yeah, we able to cancel the task and abort pending network requests
//setTimeout(() => promise.cancel(), 4500);
【讨论】:
【参考方案20】:警告这尚未针对效率进行基准测试,并且会进行大量数组复制/创建
如果您想要更实用的方法,您可以执行以下操作:
import chunk from 'lodash.chunk';
const maxConcurrency = (max) => (dataArr, promiseFn) =>
chunk(dataArr, max).reduce(
async (agg, batch) => [
...(await agg),
...(await Promise.all(batch.map(promiseFn)))
],
[]
);
然后你可以像这样使用它:
const randomFn = (data) =>
new Promise((res) => setTimeout(
() => res(data + 1),
Math.random() * 1000
));
const result = await maxConcurrency(5)(
[1, 2, 3, 4, 5, 6, 7, 8, 9, 10],
randomFn
);
console.log('result+++', result);
【讨论】:
【参考方案21】:我一直在使用 bottleneck 库,我真的很喜欢它,但在我的情况下,它并没有释放内存,而是一直在处理长时间运行的作业……这对于运行你可能的大量作业来说并不好首先需要一个节流/并发库。
我需要一个简单、开销低、易于维护的解决方案。我还想要让池保持充值的东西,而不是简单地批处理预定义的块......在下载器的情况下,这将阻止 nGB 文件阻止你的队列几分钟/小时一次,即使其余的批次在很久以前就完成了。
这是我一直在使用的 Node.js v16+、无依赖、异步生成器解决方案:
const promiseState = function( promise )
// A promise could never resolve to a unique symbol unless it was in this scope
const control = Symbol();
// This helps us determine the state of the promise... A little heavy, but it beats a third-party promise library. The control is the second element passed to Promise.race() since it will only resolve first if the promise being tested is pending.
return Promise
.race([ promise, control ])
.then( value => ( value === control ) ? 'pending' : 'fulfilled' )
.catch( () => 'rejected' );
const throttle = async function* ( reservoir, promiseFunction, highWaterMark )
let iterable = reservoir.splice( 0, highWaterMark ).map( item => promiseFunction( item ) );
while ( iterable.length > 0 )
// When a promise has resolved we have space to top it up to the high water mark...
await Promise.any( iterable );
const pending = [];
const resolved = [];
// This identifies the promise(s) that have resolved so that we can yield them
for ( const currentValue of iterable )
if ( await promiseState( currentValue ) === 'pending' )
pending.push( currentValue );
else
resolved.push( currentValue );
// Put the remaining promises back into iterable, and top it to the high water mark
iterable = [
...pending,
...reservoir.splice( 0, highWaterMark - pending.length ).map( value => promiseFunction( value ) )
];
yield Promise.allSettled( resolved );
// This is just an example of what would get passed as "promiseFunction"... This can be the function that returns your HTTP request promises
const getTimeout = delay => new Promise( (resolve, reject) => setTimeout(resolve, delay, delay) );
// This is just the async IIFE that bootstraps this example
( async () =>
const test = [ 1000, 2000, 3000, 4000, 5000, 6000, 1500, 2500, 3500, 4500, 5500, 6500 ];
for await ( const timeout of throttle( test, getTimeout, 4 ) )
console.log( timeout );
)();
【讨论】:
【参考方案22】:我有创建块并使用 .reduce 函数等待每个块 promise.alls 完成的解决方案。如果承诺有一些调用限制,我也会添加一些延迟。
export function delay(ms: number)
return new Promise<void>((resolve) => setTimeout(resolve, ms));
export const chunk = <T>(arr: T[], size: number): T[][] => [
...Array(Math.ceil(arr.length / size)),
].map((_, i) => arr.slice(size * i, size + size * i));
const myIdlist = []; // all items
const groupedIdList = chunk(myIdList, 20); // grouped by 20 items
await groupedIdList.reduce(async (prev, subIdList) =>
await prev;
// Make sure we wait for 500 ms after processing every page to prevent overloading the calls.
const data = await Promise.all(subIdList.map(myPromise));
await delay(500);
, Promise.resolve());
【讨论】:
【参考方案23】:此解决方案使用async generator 来管理带有香草 JavaScript 的并发承诺。 throttle
生成器接受 3 个参数:
Promise 仅在需要时实例化,以减少内存消耗。结果可以使用for await...of 语句进行迭代。
下面的示例提供了一个检查 Promise 状态的函数、节流异步生成器,以及一个基于 setTimeout 返回 Promise 的简单函数。最后的async IIFE 定义了超时值的存储库,设置throttle
返回的async iterable,然后在结果解析时迭代它们。
如果您想要更完整的 HTTP 请求示例,请在 cmets 中告诉我。
请注意,Node.js 16+ 是必需的才能使用异步生成器。
const promiseState = function( promise )
const control = Symbol();
return Promise
.race([ promise, control ])
.then( value => ( value === control ) ? 'pending' : 'fulfilled' )
.catch( () => 'rejected' );
const throttle = async function* ( reservoir, promiseClass, highWaterMark )
let iterable = reservoir.splice( 0, highWaterMark ).map( item => promiseClass( item ) );
while ( iterable.length > 0 )
await Promise.any( iterable );
const pending = [];
const resolved = [];
for ( const currentValue of iterable )
if ( await promiseState( currentValue ) === 'pending' )
pending.push( currentValue );
else
resolved.push( currentValue );
console.log( pending, resolved, reservoir );
iterable = [
...pending,
...reservoir.splice( 0, highWaterMark - pending.length ).map( value => promiseClass( value ) )
];
yield Promise.allSettled( resolved );
const getTimeout = delay => new Promise( ( resolve, reject ) =>
setTimeout(resolve, delay, delay);
);
( async () =>
const test = [ 1100, 1200, 1300, 10000, 11000, 9000, 5000, 6000, 3000, 4000, 1000, 2000, 3500 ];
const throttledRequests = throttle( test, getTimeout, 4 );
for await ( const timeout of throttledRequests )
console.log( timeout );
)();
【讨论】:
【参考方案24】:控制最大承诺/请求数量的一个很好的解决方案是将您的请求列表拆分为页面,并且一次只生成一个页面的请求。
下面的例子使用了iter-ops 库:
import pipe, toAsync, map, page from 'iter-ops';
const i = pipe(
toAsync(users), // make it asynchronous
page(10), // split into pages of 10 items in each
map(p => Promise.all(p.map(u => u.remoteServer.getCount(u)))), // map into requests
wait() // resolve each page in the pipeline
);
// below triggers processing page-by-page:
for await(const p of i)
//=> p = resolved page of data
这样它就不会尝试创建超过一页大小的请求/承诺。
【讨论】:
【参考方案25】:这就是我使用 Promise.race
所做的,在我的代码中
const identifyTransactions = async function()
let promises = []
let concurrency = 0
for (let tx of this.transactions)
if (concurrency > 4)
await Promise.race(promises).then(r => promises = []; concurrency = 0 )
promises.push(tx.identifyTransaction())
concurrency++
if (promises.length > 0)
await Promise.race(promises) //resolve the rest
如果你想看一个例子:https://jsfiddle.net/thecodermarcelo/av2tp83o/5/
【讨论】:
【参考方案26】:如果您的目标是减慢 Promise.all 的速度以避免速率限制或过载:
这是我的实现
async function promiseAllGentle(arr, batchSize = 5, sleep = 50)
let output = [];
while (arr.length)
const batchResult = await Promise.all(arr.splice(0, batchSize));
output = [...output, ...batchResult];
await new Promise((res) => setTimeout(res, sleep));
return output;
【讨论】:
这不会做任何事情,只是在一段时间内错过处理拒绝,导致崩溃。Promise.all
不能放慢速度,它只是等待而不运行任何任务。
你能解释一下为什么它不会返回任何任务,也不会处理拒绝吗?
promiseAllGentle(Array.from(length: 50, (_, i) => console.log(i+' happens right now'); return i == 20 ? Promise.reject("This will be handled too late") : new Promise(resolve => setTimeout(resolve, 50)); ))
谢谢,我正在考虑改用 p-limit 或 p-throttle。为什么要重新发明***;)以上是关于使用 ES6 的 Promise.all() 时限制并发的最佳方法是啥?的主要内容,如果未能解决你的问题,请参考以下文章
ES6 Promise.all() 错误句柄 - 是不是需要 .settle()? [复制]