获取在 Promise.race 中完成的承诺

Posted

技术标签:

【中文标题】获取在 Promise.race 中完成的承诺【英文标题】:Get which promise completed in Promise.race 【发布时间】:2017-08-11 07:25:33 【问题描述】:

上下文:我需要进行大量可并行化的异步调用(想想大约 300 到 3000 个 ajax 调用)。但是,我不想通过一次调用它们来使浏览器或服务器紧张。我也不想按顺序运行它们,因为完成需要很长时间。我决定一次运行五个左右,并派生了这个函数:

async function asyncLoop(asyncFns, concurrent = 5) 
    // queue up simultaneous calls 
    let queue = [];
    for (let fn of asyncFns) 
        // fire the async function and add its promise to the queue
        queue.push(fn());
        // if max concurrent, wait for the oldest one to finish
        if (queue.length >= concurrent) 
            await queue.shift();
        
    
    // wait for the rest of the calls to finish
    await Promise.all(queue);
;

其中 asyncFns 是一个可迭代的(尚未调用的)异步函数。

问题:这可行,但是我发现最老的不是第一个完成的并不总是正确的。我想修改函数,使其使用Promise.race 等到第一个承诺成功,然后从那里继续。但是,我不知道要删除哪个承诺:

        // if max concurrent, wait for the first one to finish
        if (queue.length >= concurrent) 
            await Promise.race(queue);
            // ??? get race's completed promise
            // queue.splice(queue.indexOf(completed), 1);
        

如果我只知道哪个完成的索引,我可以将它从队列中拼接出来(我猜现在更像是一组)。看起来我无法从种族返回的派生承诺中得到最初的承诺。有什么建议吗?

【问题讨论】:

看看this answer 我的看法是这样的:const [ idx, result ] = await Promise.race(promisesArr.map((promise, idx) => promise.then((result) => [ idx, result ]); 不过这不包括例外情况。为了完成它,我有一个方便的特殊函数 (safelyExecuteAsync),它返回一个元组 [error, result] 的承诺。有了它,代码就变成了:const [ idx, [error, result] ] = await Promise.race(promisesArr.map((promise, idx) => safelyExecuteAsync(promise).then((tuple) => [ idx, tuple ]); Promise 返回一个 Promise 对象,但返回的 Promise 对象本身与解析的 Promise 对象不同。它具有相同的值,但包装在不同的 Promise 中。这样想 - Promose.race 函数(或任何其他异步函数)本身有一个设置自己的 Promise 来执行其异步工作。这是来电者唯一能看到的。即使 Promise.race 用作可等待信号量,返回的 Promise 本身也没有用(值可以重复)。 【参考方案1】:

而不是单个队列,为什么不有 5 个“串行”队列

async function asyncLoop(asyncFns, concurrent = 5) 
    const queues = new Array(concurrent).fill(0).map(() => Promise.resolve());
    let index = 0;
    const add = cb => 
        index = (index + 1) % concurrent;
        return queues[index] = queues[index].then(() => cb());
    ;
    let results = [];
    for (let fn of asyncFns) 
        results.push(add(fn));
    
    await Promise.all(results);
;

好的......首先,它并不漂亮,但它似乎工作 - 但是,这假设 asyncFns 是一个 Array - 使用 Object.values 可能很容易“修复”对象

const asyncLoop = (asyncFns, concurrent = 5) => 
    let inFlight = 0;
    let pending = [];
    const end = result => 
        inFlight--;
        var job = pending.shift();
        job && job();
        return result;
    ;
    const begin = (fn) => 
        if (inFlight < concurrent) 
            inFlight++;
            return fn();
        
        let resolver;
        const promise = new Promise(resolve => 
            resolver = () => 
                inFlight ++;
                resolve(fn());
            
        );
        pending.push(resolver);
        return promise;
    
    return Promise.all(asyncFns.map(fn => begin(fn).then(end)));
;

const fns = new Array(25).fill(0).map((v, index) => () => new Promise(resolve => 
    let timeout = 1000;
    if (index == 6  || index == 11) 
        timeout = 2000;
    
    setTimeout(resolve, timeout, index);
));
console.time('timeToComplete');
asyncLoop(fns, 5).then(result => 
    console.timeEnd('timeToComplete');
    console.log(JSON.stringify(result));
);

【讨论】:

有趣的解决方案!我不会想到制作 5 串承诺链。不幸的是,并非所有的电话都需要统一的时间(以及我为什么尝试使用种族)。想象一下 25 个异步函数,24 个需要 1 秒,一个需要 5 秒。我最初的解决方案是用 5 秒打五个电话,然后用 4 秒打另外 20 个电话,总共 9 个电话。此解决方案将有一个需要 9 秒的队列。如果队列真的滚动,则需要 6 秒。但是,这确实比我原来的方法有所改进:如果两个调用是 5 秒,间隔 6 次调用,我原来的需要 13 秒,而这个解决方案需要 9 秒。 是的,我知道答案存在缺陷 - 但是,您确实知道浏览器会限制同时请求的数量,对吧 @JonathanGawrych - 我想我想出了一个解决方案 我感觉Bluebird承诺Promise.map可能是一个解决方案,我相信你可以传递一个并发参数! (该死的我软弱的头脑)【参考方案2】:

感谢@Dan D.,他在发布后不久删除了他们的答案:

let [completed] = await Promise.race(queue.map(p => p.then(res => [p])));

这会为队列中的每个元素创建一个 Promise,当 Promise 完成时返回 Promise。然后通过与那些比赛你得到第一次完成的承诺。

原来completedp 周围没有括号。由于p 是一个promise 并且有一个then 方法,promise 再次被链接,返回promise 的resolved 值而不是promise(因此它不起作用)。我认为这就是答案被删除的原因。通过将 Promise 包装在一个数组中,然后使用 Array Destructuring assignment,您可以防止它再次链接,从而获得 Promise。

【讨论】:

我看不出这是怎么回答的——人们预计 25 个请求需要 1 秒,最大并发 5 个请求需要 5 秒最少——然而单独使用该代码片段而不是 await Promise.race(queue); - 25 个请求需要 2 秒才能完成 - 显然不可能 ahhh,我没有看到该代码如何与拼接queue.splice(queue.indexOf(completed), 1); span> 【参考方案3】:

“从队列中移除”步骤应该由完成的承诺本身(使用then)发生,而不是依赖于从Promise.race 返回的承诺。看来这是解决它的唯一方法。

async function asyncLoop(asyncFns, concurrent = 5) 
    // queue up simultaneous calls 
    let queue = [];
    let ret = [];
    for (let fn of asyncFns) 
        // fire the async function, add its promise to the queue, and remove
        // it from queue when complete
        const p = fn().then(res => 
            queue.splice(queue.indexOf(p), 1);
            return res;
        );
        queue.push(p);
        ret.push(p);
        // if max concurrent, wait for one to finish
        if (queue.length >= concurrent) 
            await Promise.race(queue);
        
    
    // wait for the rest of the calls to finish
    await Promise.all(queue);
;

Npm 模块:https://github.com/rxaviers/async-pool

【讨论】:

虽然最初接受的答案(社区 wiki)有效,但这个答案具有额外的效率优势(您不会重新映射队列中的每个承诺)并且更简单/更直观。【参考方案4】:

我想要类似的东西,但我对这些答案都不满意。

这是我想出的。它不能完全回答您的问题,但可能会帮助您解决问题。

它使用类似于 Jonathan Gawrych 的答案。

也许这对其他人有帮助:

/**
 * Used like:
 * dealWithPromisesAsTheyResolve([
 *   new Promise((res, rej) => setTimeout(res, 2000, 2000)),
 *   new Promise((res, rej) => setTimeout(res, 1000, 1000)),
 *   new Promise((res, rej) => setTimeout(res, 4000, 4000)),
 *   new Promise((res, rej) => setTimeout(res,    0,    0)),
 *   new Promise((res, rej) => setTimeout(rej, 3000, 3000)),
 * ], num => console.log(num), err => console.log(`error: $err`));
 *
 * Will output:
 *   0
 *   1000
 *   2000
 *   error: 3000
 *   4000
 */

async function dealWithPromisesAsTheyResolve(promises, resolveCallback, rejectCallback) 
  var _promises = new Map();
  promises.forEach(promise => _promises.set(
    promise,
    promise
      .then(value => [null, value, promise])
      .catch(error => [error, null, promise])
  ));

  while (_promises.size > 0) 
    let [error, value, promise] = await Promise.race(_promises.values());
    _promises.delete(promise);
    if (error) 
      rejectCallback(error);
     else 
      resolveCallback(value);
    
  

您可以修改它以接受限制并在每次完成时添加新的新承诺。

【讨论】:

【参考方案5】:

这是一个简约的实现,它返回赢得Promise.race 的承诺。它使用 javascript 迭代器,因此不会创建新的数组/映射:

/**
 * When any promise is resolved or rejected, 
 * returns that promise as the result.
 * @param  Iterable.<Promise> iterablePromises An iterable of promises.
 * @return winner: Promise The winner promise.
 */
async function whenAny(iterablePromises) 
  let winner;

  await Promise.race(function* getRacers() 
    for (const p of iterablePromises) 
      if (!p?.then) throw new TypeError();
      const settle = () => winner = winner ?? p;
      yield p.then(settle, settle);
    
  ());

  // return the winner promise as an object property, 
  // to prevent automatic promise "unwrapping"
  return  winner ; 


// test it

function createTimeout(ms) 
  return new Promise(resolve => 
    setTimeout(() => resolve(ms), ms));


async function main() 
  const p = createTimeout(500);
  const result = await whenAny([
    createTimeout(1000),
    createTimeout(1500),
    p
  ]);

  console.assert(result.winner === p);
  console.log(await result.winner);


main().catch(e => console.warn(`caught on main: $e.message`));

【讨论】:

【参考方案6】:

完全受到 Jonathan Gawrych 的启发, 这是我为处理无限的承诺流所做的, 其中我想让 10 个始终并行运行

async function* concurrentResolver(promisesIter, numInParallel) 
  const pending = [];

  for (let i = 0; i < numInParallel; i++) 
    const next = promisesIter.next();
    if (next.done) 
      break;
    
    pending.push(next.value);
  

  while (pending.length) 
    const darkMagic = pending.map((p) => p.then((_) => [p]));
    const [promise] = await Promise.race(darkMagic);
    pending.splice(pending.indexOf(promise), 1);

    const next = promisesIter.next();
    if (!next.done) 
      pending.push(next.value);
    

    // the following `await` is instantaneous, since
    // the promise has already been resolved.
    yield await promise;
  

这里有一些代码来测试它:

function* promisesGenerator( howMany, timeEachResolves ) 
  for (let i = 0; i < howMany; i++) 
    yield new Promise((res) =>
      setTimeout(res, timeEachResolves, "fake server res")
    );
  


const promisesIter = promisesGenerator( howMany: 30, timeEachResolves: 3000 );
const numInParallel = 10;

for await (const res of concurrentResolver(promisesIter, numInParallel)) 
  console.log(`at $new Date().toLocaleTimeString(): $res`);


/*
Output from Chrome's console:
(10) at 7:06:44 PM: fake server res
(10) at 7:06:47 PM: fake server res
(10) at 7:06:50 PM: fake server res
*/

【讨论】:

以上是关于获取在 Promise.race 中完成的承诺的主要内容,如果未能解决你的问题,请参考以下文章

[RN] React Native Fetch请求设置超时

Promise

获得第一个兑现的承诺

在promise中超时函数的最佳一般做法是啥[关闭]

Promise.all和Promise.race的区别和使用

Promise.all和Promise.race区别,和使用场景