Nodejs 异步承诺队列
Posted
技术标签:
【中文标题】Nodejs 异步承诺队列【英文标题】:Nodejs Async Promise Queue 【发布时间】:2018-11-03 01:13:24 【问题描述】:我需要使用速率受限的 API。例如,我在一秒钟内只能进行 10 次 API 调用,所以我需要等待当前秒结束才能进行另一个 API 调用。
为了实现这一点,我想创建一个可以自行管理的异步队列。它的主要功能是让我向队列中添加一个新的 Promise,当 Promise 解决时,应用程序会收到通知:
let queue = new Queue()
queue.add(api.get('/somepath')).then(res => // handle response );
如何使用普通的 Promise 实现这一点?
export class AsyncQueue
private queue: Array<Promise<any>>;
add(promise, fct)
this.queue.push(promise);
resolveNext()
this.queue.pop().then(
// how to resolve the waiting promise in my application
)
get length()
return this.queue.length
【问题讨论】:
这听起来像是 Observables 的工作。 我认为只使用promise就可以做到吗? 看看你的代码,api.get()
会被立即调用。在队列中被pop()
ed 后不应该被调用吗?
为什么不将响应处理承诺链接到 api 调用?
它可以用一根针和一只稳定的手来完成,但我通常更喜欢经过实战考验、拥有蓬勃发展的社区的 pubsub,而不是我自己的临时实现。
【参考方案1】:
在当前实现中,api.get()
将在add
加入队列时立即被调用。你应该add
path 代替(或者可能同时使用api.get
和path
)并让AsyncQueue
在可行时初始化Promise。确保让 add
返回一个 Promise,一旦 API 调用完成,该 Promise 就会解析。
例如,在 vanilla JS 中,它可能如下所示:
const apiGet = () => new Promise(resolve => setTimeout(resolve, 1000));
class AsyncQueue
queue = [];
constructor()
setInterval(this.resolveNext.bind(this), 2000);
add(fn, param)
return new Promise(resolve =>
this.queue.unshift( fn, param, resolve );
);
resolveNext()
if (!this.queue.length) return;
const fn, param, resolve = this.queue.pop();
fn(param).then(resolve);
const queue = new AsyncQueue()
console.log('start');
// Will resolve after 2000 + 1000 seconds:
queue.add(apiGet, '/somepath').then(res =>
console.log('handling response 1');
);
// Will resolve after 4000 + 1000 seconds:
queue.add(apiGet, '/somepath').then(res =>
console.log('handling response 2');
);
【讨论】:
这是一个非常好的方法。 我有很多时间等待找到解决我的问题的方法,即在前一个完成时调用异步函数,使用队列。剩下的问题是堆栈,如果异步函数需要更多时间调用者。但这就是生活。【参考方案2】:为了避免永久调用resolveNext(),可以这样实现吗?
class AsyncQueue
/* delayBetween: delay (ms) before calling next item
*/
constructor( delayBetween)
this.queue = [];
this.id = 0;
if (delayBetween < 1)
delayBetween = 1;
this.delayBetween = delayBetween;
this.timer = null;
// setInterval( this.resolveNext.bind(this), this.delayBetween);
add(fn, param)
return new Promise( resolve =>
// liste inversée : le dernier élément ajouté est au début du tableau
this.id ++;
param.queueId = this.id;
// console.log( `$new Date().yyyymmddhhmmsslll() > push request: $JSON.stringify(param)`);
this.queue.unshift( fn, param, resolve );
// console.log( `$new Date().yyyymmddhhmmsslll() > add() > setTimeout...`);
if (this.timer == null)
this.timer = setTimeout( this.resolveNext.bind(this), this.delayBetween);
);
resolveNext()
this.timer = null;
// console.log( `$new Date().yyyymmddhhmmsslll() > resolveNext() > called, len: $this.queue.length...`);
if ( ! this.queue.length) return;
const fn, param, resolve = this.queue.pop();
// console.log( `$new Date().yyyymmddhhmmsslll() > pop request: $JSON.stringify(param)`);
// execute fn, and call resolve only when finished
// fn(param).then(resolve);
fn(param).then((result) =>
// console.log( `$new Date().yyyymmddhhmmsslll() > fn resolved: $JSON.stringify(result)`);
if (this.timer == null)
this.timer = setTimeout( this.resolveNext.bind(this), this.delayBetween);
);
【讨论】:
以上是关于Nodejs 异步承诺队列的主要内容,如果未能解决你的问题,请参考以下文章
Redis + NodeJS 实现一个能处理海量数据的异步任务队列系统