NodeJS异步队列太快(减慢异步队列方法)
Posted
技术标签:
【中文标题】NodeJS异步队列太快(减慢异步队列方法)【英文标题】:NodeJS async queue too fast (Slowing down async queue method) 【发布时间】:2012-12-31 08:11:59 【问题描述】:我有一个 HTTP Get 请求,我想解析响应并将其保存到我的数据库中。
如果我单独调用 crawl(i),我会得到很好的结果。但我必须从 1 到 2000 调用 crawl()。 我得到了很好的结果,但有些响应似乎丢失了,有些响应是重复的。我想我不明白如何调用数千个异步函数。我正在使用async module 队列功能,但到目前为止我仍然缺少一些数据并且仍然有一些重复。我在这里做错了什么?感谢您的帮助。
What i am crawling
我的节点功能:
function getOptions(i)
return
host: 'magicseaweed.com',
path: '/syndicate/rss/index.php?id='+i+'&unit=uk',
method: 'GET'
;
function crawl(i)
var req = http.request(getOptions(i), function(res)
res.on('data', function (body)
parseLocation(body);
);
);
req.end();
function parseLocation(body)
parser.parseString(body, function(err, result)
if(result && typeof result.rss != 'undefined')
var locationTitle = result.rss.channel[0].title;
var locationString = result.rss.channel[0].item[0].link[0];
var location = new Location(
id: locationString.split('/')[2],
name: locationTitle
);
location.save();
);
N = 2 //# of simultaneous tasks
var q = async.queue(function (task, callback)
crawl(task.url);
callback();
, N);
q.drain = function()
console.log('Crawling done.');
for(var i = 0; i < 100; i++)
q.push(url: 'http://magicseaweed.com/syndicate/rss/index.php?id='+i+'&unit=uk');
[编辑] 好吧,经过大量测试后,我正在抓取的服务似乎无法快速处理这么多请求。因为当我按顺序执行每个请求时,我可以获得所有好的响应。
有没有办法减缓异步队列的方法?
【问题讨论】:
现在好像很多请求都失败了……我怎样才能保证请求不会失败? 您是否尝试使用 setTimeOut 来延迟队列工作函数中的callback()
调用?这样可以减慢队列中任务的执行速度。
【参考方案1】:
你应该看看这个很棒的模块,async,它简化了这样的异步任务。可以使用队列,简单例子:
N = # of simultaneous tasks
var q = async.queue(function (task, callback)
somehttprequestfunction(task.url, function()
callback();
, N);
q.drain = function()
console.log('all items have been processed');
for(var i = 0; i < 2000; i++)
q.push(url:"http://somewebsite.com/"+i+"/feed/");
如果您只调用回调函数,它将有一个正在进行的操作的窗口,并且任务室将可用于未来的任务。不同的是,您的代码现在立即打开 2000 个连接,显然失败率很高。将其限制在一个合理的值,5,10,20(取决于站点和连接)将导致更好的成功率。如果请求失败,您可以随时重试,或将任务推送到另一个异步队列以进行另一次试用。关键是在队列函数中调用callback(),这样房间完成后就可以使用了。
【讨论】:
我尝试了您的解决方案,但我仍然有重复和很多缺失的响应。我已经更新了我的问题和代码,你能解释一下吗?谢谢。 由于您的要求很简单,您可以使用request module,我目前看不出您的代码有什么问题。 他们是否在等待限制每秒从队列中弹出多少项目? 您错过了这样一个事实,即 Node.js 对正在处理为 C++ 或 VB 的函数没有标准的阻塞效果,因此,一旦进行调用,它会提前进行下一次调用而无需等待为响应。为了解决这个问题,您应该使用 Promise(使用 Promise 搜索 Node.js http)。使用带有异步的场所,将允许您在服务器中创建一个任务缓冲区,因此您的代码不会受到大量连接和相应的响应延迟的影响,TL;DR 如果您的爬网函数只有顺序操作(如数学)它一切都会完美的 由于某种原因 q.drain 没有被触发。将 q.drain = function() 更改为箭头函数 q.drain(() => ) 解决了我的问题【参考方案2】:var q = async.queue(function (task, callback)
crawl(task.url);
callback();
, N);
你在启动前一个任务后立即执行下一个任务,这样队列就没有意义了。你应该像这样修改你的代码:
// first, modify your 'crawl' function to take a callback argument, and call this callback after the job is done.
// then
var q = async.queue(function (task, next/* name this argument as 'next' is more meaningful */)
crawl(task.url, function ()
// after this one is done, start next one.
next();
);
// or, more simple way, crawl(task.url, next);
, N);
【讨论】:
【参考方案3】:如果您愿意,还有其他选择。没有花哨的库的 Vanilla JS。
var incrementer = 0;
var resultsArray = [];
var myInterval = setInterval(function()
incrementer++
if(incrementer == 100)
clearInterval(myInterval)
//when done parse results array
//make request here
//push request result to array here
, 500);
每半秒调用一次函数。在 x 个请求后强制同步和退出的简单方法。
【讨论】:
我们曾经使用过这种方法,但是它确实有一个问题:作为任何HTTP相关服务,都有限制,而您可以知道最大请求率并调整间隔,如果服务器窒息,它之后的所有请求都会得到 503(请求太多),否则您将不得不降低限制以弥补这一点【参考方案4】:我知道我的问题有点晚了,但是这是我写的一个解决方案,用于在使用节点 4 或节点 5 测试 api 端点时减慢请求的数量:
var fs = require('fs');
var supertest = require('supertest');
var request = supertest("http://sometesturl.com/api/test/v1/")
var Helper = require('./check.helper');
var basicAuth = Helper.basicAuth;
var options = Helper.options;
fs.readFile('test.txt', function(err, data)
var parsedItems = JSON.parse(data);
var urlparts = []
// create a queue
for (let year of range(1975, 2016))
for (var make in parsedItems[year])
console.log(year, make, '/models/' + year + '/' + make)
urlparts.push(urlpart:'/models/' + year + '/' + make, year: year, make: make)
// start dequeue
waitDequeue();
// This function calls itself after the makeRequest promise completes
function waitDequeue()
var item = urlparts.pop()
if (item)
makeRequest(item)
.then(function()
// wait this time before next dequeue
setTimeout(function()
waitDequeue();
, 3000);
)
else
write(parsedItems)
// make a request, mutate parsedItems then resolve
function makeRequest(item)
return new Promise((resolve, reject)=>
request
.get(item.urlpart)
.set(options.auth[0], options.auth[1])
.set(options.type[0], options.type[1])
.end(function(err, res)
if (err) return done1(err);
console.log(res.body)
res.body.forEach(function(model)
parsedItems[item.year][item.make][model] =
);
resolve()
)
)
// write the results back to the file
function write(parsedItems)
fs.writeFile('test.txt', JSON.stringify(parsedItems, null, 4), function(err)
console.log(err)
)
)
【讨论】:
【参考方案5】:有点晚了,但我发现这行得通! 使用异步,您可以通过在任务处理程序中使用 while 来减慢队列速度,例如:
var q = async.priorityQueue(function(task, callback)
// your code process here for each task
//when ready to complete the task delay it by calling
async.whilst( //wait 6 seconds
function()
return count < 10;
,
function(callback)
count++;
setTimeout(function()
callback(null, count);
, 1000);
,
function (err, n)
// n seconds have passed
callback(); //callback to q handler
); //whilst
, 5);
【讨论】:
以上是关于NodeJS异步队列太快(减慢异步队列方法)的主要内容,如果未能解决你的问题,请参考以下文章
Redis + NodeJS 实现一个能处理海量数据的异步任务队列系统