连续迭代 mongodb 游标(在移动到下一个文档之前等待回调)
Posted
技术标签:
【中文标题】连续迭代 mongodb 游标(在移动到下一个文档之前等待回调)【英文标题】:Iterating over a mongodb cursor serially (waiting for callbacks before moving to next document) 【发布时间】:2013-08-09 18:34:42 【问题描述】:使用 mongoskin,我可以做这样的查询,它会返回一个游标:
myCollection.find(, function(err, resultCursor)
resultCursor.each(function(err, result)
但是,我想为每个文档调用一些异步函数,并在回调后才移动到光标上的下一项(类似于 async.js 模块中的 eachSeries 结构)。例如:
myCollection.find(, function(err, resultCursor)
resultCursor.each(function(err, result)
externalAsyncFunction(result, function(err)
//externalAsyncFunction completed - now want to move to next doc
);
我该怎么做?
谢谢
更新:
我不想使用toArray()
,因为这是一个大批量操作,结果可能一次无法放入内存。
【问题讨论】:
如果您在继续之前阻塞并等待异步函数完成,那么异步调用它有什么意义? @RotemHermon 我别无选择!这不是我的功能,它是异步的。 (将 myAsyncFunction 重命名为 externalAsyncFunction...) 你为什么不使用toArray()
,然后使用递归函数来迭代结果?
@Салман - 好问题 - 我没有使用 toArray 因为它是一个大批量操作,完整的结果可能不适合内存。 (我会更新问题)
【参考方案1】:
您可以在Array
中获取结果并使用递归函数进行迭代,类似这样。
myCollection.find().toArray(function (err, items)
var count = items.length;
var fn = function ()
externalAsyncFuntion(items[count], function ()
count -= 1;
if (count) fn();
)
fn();
);
编辑:
这仅适用于小型数据集,对于较大的数据集,您应该使用其他答案中提到的游标。
【讨论】:
抱歉,我在 cmets 中回答您的问题太慢了 - 由于结果集太大,我无法使用 toArray。 哦,好的。那么另一个答案适合你。 虽然如此,但最好避免这种模式,因为它会随着数据的增长而中断。【参考方案2】:如果您不想使用 toArray 将所有结果加载到内存中,您可以使用光标进行迭代,如下所示。
myCollection.find(, function(err, resultCursor)
function processItem(err, item)
if(item === null)
return; // All done!
externalAsyncFunction(item, function(err)
resultCursor.nextObject(processItem);
);
resultCursor.nextObject(processItem);
【讨论】:
这种方法不适用于大型数据集。我收到“RangeError:超出最大调用堆栈大小”。 @SoichiHayashi 将异步函数或回调包装在process.nextTick
!
@SoichiHayashi 跟进@zamnuts - 你的堆栈溢出上面例子的原因是因为每次你处理一个项目时,你运行另一个回调来处理下一个项目在当前的处理函数。随着结果集的增长,您会循环访问更多的函数调用,并且每个调用都会在前一个之上创建一个新的堆栈框架。将异步回调包装在 process.nextTick
、setImmediate
或 setTimeout
中会使其在下一个循环中运行,即我们为处理每个文档而创建的调用堆栈“外部”。
cursor.forEach()
呢?
@Redsandro - cursor.forEach() 没有提供异步信号方式来移动到下一个项目。【参考方案3】:
您可以使用异步库执行类似的操作。这里的关键是检查当前文档是否为空。如果是,则表示您已完成。
async.series([
function (cb)
cursor.each(function (err, doc)
if (err)
cb(err);
else if (doc === null)
cb();
else
console.log(doc);
array.push(doc);
);
], function (err)
callback(err, array);
);
【讨论】:
嗨 Antoine - 我使用这种方法遇到的问题是,如果您需要对每条记录异步执行某些操作,那么游标循环无法等待直到完成。 (cursor.each 不提供“下一个”回调,因此只能在其中进行同步操作)。【参考方案4】:您可以使用简单的 setTimeOut。这是在 nodejs 上运行的 typescript 中的一个示例(我通过'when'模块使用承诺,但也可以在没有它们的情况下完成):
import mongodb = require("mongodb");
var dbServer = new mongodb.Server('localhost', 27017, auto_reconnect: true, );
var db = new mongodb.Db('myDb', dbServer);
var util = require('util');
var when = require('when'); //npm install when
var dbDefer = when.defer();
db.open(function()
console.log('db opened...');
dbDefer.resolve(db);
);
dbDefer.promise.then(function(db : mongodb.Db)
db.collection('myCollection', function (error, dataCol)
if(error)
console.error(error); return;
var doneReading = when.defer();
var processOneRecordAsync = function(record) : When.Promise
var result = when.defer();
setTimeout (function()
//simulate a variable-length operation
console.log(util.inspect(record));
result.resolve('record processed');
, Math.random()*5);
return result.promise;
var runCursor = function (cursor : MongoCursor)
cursor.next(function(error : any, record : any)
if (error)
console.log('an error occurred: ' + error);
return;
if (record)
processOneRecordAsync(record).then(function(r)
setTimeout(function() runCursor(cursor), 1);
);
else
//cursor up
doneReading.resolve('done reading data.');
);
dataCol.find(, function(error, cursor : MongoCursor)
if (!error)
setTimeout(function() runCursor(cursor), 1);
);
doneReading.promise.then(function(message : string)
//message='done reading data'
console.log(message);
);
);
);
【讨论】:
【参考方案5】:你可以使用 Future:
myCollection.find(, function(err, resultCursor)
resultCursor.count(Meteor.bindEnvironment(function(err,count)
for(var i=0;i<count;i++)
var itemFuture=new Future();
resultCursor.nextObject(function(err,item))
itemFuture.result(item);
var item=itemFuture.wait();
//do what you want with the item,
//and continue with the loop if so
));
);
【讨论】:
【参考方案6】:如果有人正在寻找一种 Promise 方式来执行此操作(而不是使用 nextObject 的回调),那么就是这样。我正在使用 Node v4.2.2 和 mongo 驱动程序 v2.1.7。这是 Cursor.forEach()
的 asyncSeries 版本:
function forEachSeries(cursor, iterator)
return new Promise(function(resolve, reject)
var count = 0;
function processDoc(doc)
if (doc != null)
count++;
return iterator(doc).then(function()
return cursor.next().then(processDoc);
);
else
resolve(count);
cursor.next().then(processDoc);
);
要使用它,请传递光标和一个异步操作每个文档的迭代器(就像您对 Cursor.forEach 所做的那样)。迭代器需要返回一个 promise,就像大多数 mongodb 原生驱动函数一样。
假设您要更新集合test
中的所有文档。你会这样做:
var theDb;
MongoClient.connect(dbUrl).then(function(db)
theDb = db; // save it, we'll need to close the connection when done.
var cur = db.collection('test').find();
return forEachSeries(cur, function(doc) // this is the iterator
return db.collection('test').updateOne(
_id: doc._id,
$set: updated: true // or whatever else you need to change
);
// updateOne returns a promise, if not supplied a callback. Just return it.
);
)
.then(function(count)
console.log("All Done. Processed", count, "records");
theDb.close();
)
【讨论】:
我看不到forEachSeries
的调用位置。
调用栈溢出。【参考方案7】:
这适用于使用 setImmediate 处理大型数据集:
var cursor = collection.find(filter...).cursor();
cursor.nextObject(function fn(err, item)
if (err || !item) return;
setImmediate(fnAction, item, arg1, arg2, function()
cursor.nextObject(fn);
);
);
function fnAction(item, arg1, arg2, callback)
// Here you can do whatever you want to do with your item.
return callback();
【讨论】:
这很好,但你不需要第一行的“.cursor()”(我有一个错误)。 这取决于使用的猫鼬版本。它是给一个年长的人【参考方案8】:使用async
/await
的更现代的方法:
const cursor = db.collection("foo").find();
while(await cursor.hasNext())
const doc = await cursor.next();
// process doc here
注意事项:
当async iterators 到达时,这可能更加简单。 您可能需要添加 try/catch 以进行错误检查。 包含函数应该是async
,或者代码应该包裹在(async function() ... )()
中,因为它使用await
。
如果需要,请在 while 循环的末尾添加 await new Promise(resolve => setTimeout(resolve, 1000));
(暂停 1 秒),以表明它会一个接一个地处理文档。
【讨论】:
完美运行,谢谢。知道大型数据集是否存在任何缺陷? 太好了,这是最好的解决方案,不像选择的会崩溃 如何在节点中使用它?我收到此错误:“cursor.hasNext()”处的“SyntaxError: Unexpected identifier” @Nico,抱歉回复晚了,但请参阅注释中的第 3 点;)【参考方案9】:since node.js v10.3你可以使用异步迭代器
const cursor = db.collection('foo').find();
for await (const doc of cursor)
// do your thing
// you can even use `await myAsyncOperation()` here
Jake Archibald 写了关于异步迭代器的 a great blog post,这是我在阅读 @user993683 的回答后才知道的。
【讨论】:
正在寻找这个解决方案,谢谢!以上是关于连续迭代 mongodb 游标(在移动到下一个文档之前等待回调)的主要内容,如果未能解决你的问题,请参考以下文章
MongoDB Cursor - 计数和枚举文档的有效方法(在节点中)
mongodb3.2系统性学习——5游标 模糊查询 findAndModify函数