Nodejs 异步承诺队列

Posted

技术标签:

【中文标题】Nodejs 异步承诺队列【英文标题】:Nodejs Async Promise Queue 【发布时间】:2018-11-03 01:13:24 【问题描述】:

我需要使用速率受限的 API。例如,我在一秒钟内只能进行 10 次 API 调用,所以我需要等待当前秒结束才能进行另一个 API 调用。

为了实现这一点,我想创建一个可以自行管理的异步队列。它的主要功能是让我向队列中添加一个新的 Promise,当 Promise 解决时,应用程序会收到通知:

let queue = new Queue()

queue.add(api.get('/somepath')).then(res =>  // handle response );

如何使用普通的 Promise 实现这一点?

export class AsyncQueue 

    private queue: Array<Promise<any>>;


    add(promise, fct) 
        this.queue.push(promise);
    

    resolveNext() 
        this.queue.pop().then(
            // how to resolve the waiting promise in my application
        )
    

    get length() 
        return this.queue.length
    


【问题讨论】:

这听起来像是 Observables 的工作。 我认为只使用promise就可以做到吗? 看看你的代码,api.get() 会被立即调用。在队列中被pop()ed 后不应该被调用吗? 为什么不将响应处理承诺链接到 api 调用? 可以用一根针和一只稳定的手来完成,但我通常更喜欢经过实战考验、拥有蓬勃发展的社区的 pubsub,而不是我自己的临时实现。 【参考方案1】:

在当前实现中,api.get() 将在add 加入队列时立即被调用。你应该add path 代替(或者可能同时使用api.getpath)并让AsyncQueue 在可行时初始化Promise。确保让 add 返回一个 Promise,一旦 API 调用完成,该 Promise 就会解析。

例如,在 vanilla JS 中,它可能如下所示:

const apiGet = () => new Promise(resolve => setTimeout(resolve, 1000));

class AsyncQueue 
  queue = [];
  constructor() 
    setInterval(this.resolveNext.bind(this), 2000);
  
  add(fn, param) 
    return new Promise(resolve => 
      this.queue.unshift( fn, param, resolve );
    );
  
  resolveNext() 
    if (!this.queue.length) return;
    const  fn, param, resolve  = this.queue.pop();
    fn(param).then(resolve);
  



const queue = new AsyncQueue()
console.log('start');
// Will resolve after 2000 + 1000 seconds:
queue.add(apiGet, '/somepath').then(res => 
  console.log('handling response 1');
);
// Will resolve after 4000 + 1000 seconds:
queue.add(apiGet, '/somepath').then(res => 
  console.log('handling response 2');
);

【讨论】:

这是一个非常好的方法。 我有很多时间等待找到解决我的问题的方法,即在前一个完成时调用异步函数,使用队列。剩下的问题是堆栈,如果异步函数需要更多时间调用者。但这就是生活。【参考方案2】:

为了避免永久调用resolveNext(),可以这样实现吗?

class AsyncQueue 
  
    /* delayBetween: delay (ms) before calling next item
        */
  constructor( delayBetween) 
        this.queue = [];
        this.id = 0;
        if (delayBetween < 1) 
            delayBetween = 1;
        
        this.delayBetween = delayBetween;
        this.timer = null;
    // setInterval( this.resolveNext.bind(this), this.delayBetween);
  
    
  add(fn, param) 
    return new Promise( resolve => 
        // liste inversée : le dernier élément ajouté est au début du tableau
        this.id ++;
        param.queueId = this.id;
        // console.log( `$new Date().yyyymmddhhmmsslll() > push request: $JSON.stringify(param)`);
      this.queue.unshift(  fn, param, resolve  );
        // console.log( `$new Date().yyyymmddhhmmsslll() > add() > setTimeout...`);
        if (this.timer == null) 
            this.timer = setTimeout( this.resolveNext.bind(this), this.delayBetween);
        
    );
  
    
  resolveNext() 
        this.timer = null;
        // console.log( `$new Date().yyyymmddhhmmsslll() > resolveNext() > called, len: $this.queue.length...`);
    if ( ! this.queue.length) return;       
    const  fn, param, resolve  = this.queue.pop();
        // console.log( `$new Date().yyyymmddhhmmsslll() > pop request: $JSON.stringify(param)`);
        // execute fn, and call resolve only when finished
    // fn(param).then(resolve);
    fn(param).then((result) => 
        // console.log( `$new Date().yyyymmddhhmmsslll() > fn resolved: $JSON.stringify(result)`);
        if (this.timer == null) 
            this.timer = setTimeout( this.resolveNext.bind(this), this.delayBetween);
        
    );
  

【讨论】:

以上是关于Nodejs 异步承诺队列的主要内容,如果未能解决你的问题,请参考以下文章

NodeJS 承诺睡眠时间太长

刷新承诺队列?

Redis + NodeJS 实现一个能处理海量数据的异步任务队列系统

Redis + NodeJS 实现一个能处理海量数据的异步任务队列系统

iOS(Swift) TaskProtocol异步任务队列

异步队列永远不会触发随机排放