获取在 Promise.race 中完成的承诺
Posted
技术标签:
【中文标题】获取在 Promise.race 中完成的承诺【英文标题】:Get which promise completed in Promise.race 【发布时间】:2017-08-11 07:25:33 【问题描述】:上下文:我需要进行大量可并行化的异步调用(想想大约 300 到 3000 个 ajax 调用)。但是,我不想通过一次调用它们来使浏览器或服务器紧张。我也不想按顺序运行它们,因为完成需要很长时间。我决定一次运行五个左右,并派生了这个函数:
async function asyncLoop(asyncFns, concurrent = 5)
// queue up simultaneous calls
let queue = [];
for (let fn of asyncFns)
// fire the async function and add its promise to the queue
queue.push(fn());
// if max concurrent, wait for the oldest one to finish
if (queue.length >= concurrent)
await queue.shift();
// wait for the rest of the calls to finish
await Promise.all(queue);
;
其中 asyncFns 是一个可迭代的(尚未调用的)异步函数。
问题:这可行,但是我发现最老的不是第一个完成的并不总是正确的。我想修改函数,使其使用Promise.race 等到第一个承诺成功,然后从那里继续。但是,我不知道要删除哪个承诺:
// if max concurrent, wait for the first one to finish
if (queue.length >= concurrent)
await Promise.race(queue);
// ??? get race's completed promise
// queue.splice(queue.indexOf(completed), 1);
如果我只知道哪个完成的索引,我可以将它从队列中拼接出来(我猜现在更像是一组)。看起来我无法从种族返回的派生承诺中得到最初的承诺。有什么建议吗?
【问题讨论】:
看看this answer 我的看法是这样的:const [ idx, result ] = await Promise.race(promisesArr.map((promise, idx) => promise.then((result) => [ idx, result ]);
不过这不包括例外情况。为了完成它,我有一个方便的特殊函数 (safelyExecuteAsync
),它返回一个元组 [error, result] 的承诺。有了它,代码就变成了:const [ idx, [error, result] ] = await Promise.race(promisesArr.map((promise, idx) => safelyExecuteAsync(promise).then((tuple) => [ idx, tuple ]);
Promise 返回一个 Promise 对象,但返回的 Promise 对象本身与解析的 Promise 对象不同。它具有相同的值,但包装在不同的 Promise 中。这样想 - Promose.race 函数(或任何其他异步函数)本身有一个设置自己的 Promise 来执行其异步工作。这是来电者唯一能看到的。即使 Promise.race 用作可等待信号量,返回的 Promise 本身也没有用(值可以重复)。
【参考方案1】:
而不是单个队列,为什么不有 5 个“串行”队列
async function asyncLoop(asyncFns, concurrent = 5)
const queues = new Array(concurrent).fill(0).map(() => Promise.resolve());
let index = 0;
const add = cb =>
index = (index + 1) % concurrent;
return queues[index] = queues[index].then(() => cb());
;
let results = [];
for (let fn of asyncFns)
results.push(add(fn));
await Promise.all(results);
;
好的......首先,它并不漂亮,但它似乎工作 - 但是,这假设 asyncFns
是一个 Array - 使用 Object.values
可能很容易“修复”对象
const asyncLoop = (asyncFns, concurrent = 5) =>
let inFlight = 0;
let pending = [];
const end = result =>
inFlight--;
var job = pending.shift();
job && job();
return result;
;
const begin = (fn) =>
if (inFlight < concurrent)
inFlight++;
return fn();
let resolver;
const promise = new Promise(resolve =>
resolver = () =>
inFlight ++;
resolve(fn());
);
pending.push(resolver);
return promise;
return Promise.all(asyncFns.map(fn => begin(fn).then(end)));
;
const fns = new Array(25).fill(0).map((v, index) => () => new Promise(resolve =>
let timeout = 1000;
if (index == 6 || index == 11)
timeout = 2000;
setTimeout(resolve, timeout, index);
));
console.time('timeToComplete');
asyncLoop(fns, 5).then(result =>
console.timeEnd('timeToComplete');
console.log(JSON.stringify(result));
);
【讨论】:
有趣的解决方案!我不会想到制作 5 串承诺链。不幸的是,并非所有的电话都需要统一的时间(以及我为什么尝试使用种族)。想象一下 25 个异步函数,24 个需要 1 秒,一个需要 5 秒。我最初的解决方案是用 5 秒打五个电话,然后用 4 秒打另外 20 个电话,总共 9 个电话。此解决方案将有一个需要 9 秒的队列。如果队列真的滚动,则需要 6 秒。但是,这确实比我原来的方法有所改进:如果两个调用是 5 秒,间隔 6 次调用,我原来的需要 13 秒,而这个解决方案需要 9 秒。 是的,我知道答案存在缺陷 - 但是,您确实知道浏览器会限制同时请求的数量,对吧 @JonathanGawrych - 我想我想出了一个解决方案 我感觉Bluebird承诺Promise.map可能是一个解决方案,我相信你可以传递一个并发参数! (该死的我软弱的头脑)【参考方案2】:感谢@Dan D.,他在发布后不久删除了他们的答案:
let [completed] = await Promise.race(queue.map(p => p.then(res => [p])));
这会为队列中的每个元素创建一个 Promise,当 Promise 完成时返回 Promise。然后通过与那些比赛你得到第一次完成的承诺。
原来completed
或p
周围没有括号。由于p
是一个promise 并且有一个then
方法,promise 再次被链接,返回promise 的resolved 值而不是promise(因此它不起作用)。我认为这就是答案被删除的原因。通过将 Promise 包装在一个数组中,然后使用 Array Destructuring assignment,您可以防止它再次链接,从而获得 Promise。
【讨论】:
我看不出这是怎么回答的——人们预计 25 个请求需要 1 秒,最大并发 5 个请求需要 5 秒最少——然而单独使用该代码片段而不是await Promise.race(queue);
- 25 个请求需要 2 秒才能完成 - 显然不可能
ahhh,我没有看到该代码如何与拼接queue.splice(queue.indexOf(completed), 1);
span>
【参考方案3】:
“从队列中移除”步骤应该由完成的承诺本身(使用then
)发生,而不是依赖于从Promise.race
返回的承诺。看来这是解决它的唯一方法。
async function asyncLoop(asyncFns, concurrent = 5)
// queue up simultaneous calls
let queue = [];
let ret = [];
for (let fn of asyncFns)
// fire the async function, add its promise to the queue, and remove
// it from queue when complete
const p = fn().then(res =>
queue.splice(queue.indexOf(p), 1);
return res;
);
queue.push(p);
ret.push(p);
// if max concurrent, wait for one to finish
if (queue.length >= concurrent)
await Promise.race(queue);
// wait for the rest of the calls to finish
await Promise.all(queue);
;
Npm 模块:https://github.com/rxaviers/async-pool
【讨论】:
虽然最初接受的答案(社区 wiki)有效,但这个答案具有额外的效率优势(您不会重新映射队列中的每个承诺)并且更简单/更直观。【参考方案4】:我想要类似的东西,但我对这些答案都不满意。
这是我想出的。它不能完全回答您的问题,但可能会帮助您解决问题。
它使用类似于 Jonathan Gawrych 的答案。
也许这对其他人有帮助:
/**
* Used like:
* dealWithPromisesAsTheyResolve([
* new Promise((res, rej) => setTimeout(res, 2000, 2000)),
* new Promise((res, rej) => setTimeout(res, 1000, 1000)),
* new Promise((res, rej) => setTimeout(res, 4000, 4000)),
* new Promise((res, rej) => setTimeout(res, 0, 0)),
* new Promise((res, rej) => setTimeout(rej, 3000, 3000)),
* ], num => console.log(num), err => console.log(`error: $err`));
*
* Will output:
* 0
* 1000
* 2000
* error: 3000
* 4000
*/
async function dealWithPromisesAsTheyResolve(promises, resolveCallback, rejectCallback)
var _promises = new Map();
promises.forEach(promise => _promises.set(
promise,
promise
.then(value => [null, value, promise])
.catch(error => [error, null, promise])
));
while (_promises.size > 0)
let [error, value, promise] = await Promise.race(_promises.values());
_promises.delete(promise);
if (error)
rejectCallback(error);
else
resolveCallback(value);
您可以修改它以接受限制并在每次完成时添加新的新承诺。
【讨论】:
【参考方案5】:这是一个简约的实现,它返回赢得Promise.race
的承诺。它使用 javascript 迭代器,因此不会创建新的数组/映射:
/**
* When any promise is resolved or rejected,
* returns that promise as the result.
* @param Iterable.<Promise> iterablePromises An iterable of promises.
* @return winner: Promise The winner promise.
*/
async function whenAny(iterablePromises)
let winner;
await Promise.race(function* getRacers()
for (const p of iterablePromises)
if (!p?.then) throw new TypeError();
const settle = () => winner = winner ?? p;
yield p.then(settle, settle);
());
// return the winner promise as an object property,
// to prevent automatic promise "unwrapping"
return winner ;
// test it
function createTimeout(ms)
return new Promise(resolve =>
setTimeout(() => resolve(ms), ms));
async function main()
const p = createTimeout(500);
const result = await whenAny([
createTimeout(1000),
createTimeout(1500),
p
]);
console.assert(result.winner === p);
console.log(await result.winner);
main().catch(e => console.warn(`caught on main: $e.message`));
【讨论】:
【参考方案6】:完全受到 Jonathan Gawrych 的启发, 这是我为处理无限的承诺流所做的, 其中我想让 10 个始终并行运行。
async function* concurrentResolver(promisesIter, numInParallel)
const pending = [];
for (let i = 0; i < numInParallel; i++)
const next = promisesIter.next();
if (next.done)
break;
pending.push(next.value);
while (pending.length)
const darkMagic = pending.map((p) => p.then((_) => [p]));
const [promise] = await Promise.race(darkMagic);
pending.splice(pending.indexOf(promise), 1);
const next = promisesIter.next();
if (!next.done)
pending.push(next.value);
// the following `await` is instantaneous, since
// the promise has already been resolved.
yield await promise;
这里有一些代码来测试它:
function* promisesGenerator( howMany, timeEachResolves )
for (let i = 0; i < howMany; i++)
yield new Promise((res) =>
setTimeout(res, timeEachResolves, "fake server res")
);
const promisesIter = promisesGenerator( howMany: 30, timeEachResolves: 3000 );
const numInParallel = 10;
for await (const res of concurrentResolver(promisesIter, numInParallel))
console.log(`at $new Date().toLocaleTimeString(): $res`);
/*
Output from Chrome's console:
(10) at 7:06:44 PM: fake server res
(10) at 7:06:47 PM: fake server res
(10) at 7:06:50 PM: fake server res
*/
【讨论】:
以上是关于获取在 Promise.race 中完成的承诺的主要内容,如果未能解决你的问题,请参考以下文章