连续迭代 mongodb 游标(在移动到下一个文档之前等待回调)

Posted

技术标签:

【中文标题】连续迭代 mongodb 游标(在移动到下一个文档之前等待回调)【英文标题】:Iterating over a mongodb cursor serially (waiting for callbacks before moving to next document) 【发布时间】:2013-08-09 18:34:42 【问题描述】:

使用 mongoskin,我可以做这样的查询,它会返回一个游标:

myCollection.find(, function(err, resultCursor) 
      resultCursor.each(function(err, result) 

      

但是,我想为每个文档调用一些异步函数,并在回调后才移动到光标上的下一项(类似于 async.js 模块中的 eachSeries 结构)。例如:

myCollection.find(, function(err, resultCursor) 
      resultCursor.each(function(err, result) 

            externalAsyncFunction(result, function(err) 
               //externalAsyncFunction completed - now want to move to next doc
            );

      
  

我该怎么做?

谢谢

更新:

我不想使用toArray(),因为这是一个大批量操作,结果可能一次无法放入内存。

【问题讨论】:

如果您在继续之前阻塞并等待异步函数完成,那么异步调用它有什么意义? @RotemHermon 我别无选择!这不是我的功能,它是异步的。 (将 myAsyncFunction 重命名为 externalAsyncFunction...) 你为什么不使用toArray(),然后使用递归函数来迭代结果? @Салман - 好问题 - 我没有使用 toArray 因为它是一个大批量操作,完整的结果可能不适合内存。 (我会更新问题) 【参考方案1】:

您可以在Array 中获取结果并使用递归函数进行迭代,类似这样。

myCollection.find().toArray(function (err, items) 
    var count = items.length;
    var fn = function () 
        externalAsyncFuntion(items[count], function () 
            count -= 1;
            if (count) fn();
        )
    

    fn();
);

编辑:

这仅适用于小型数据集,对于较大的数据集,您应该使用其他答案中提到的游标。

【讨论】:

抱歉,我在 cmets 中回答您的问题太慢了 - 由于结果集太大,我无法使用 toArray。 哦,好的。那么另一个答案适合你。 虽然如此,但最好避免这种模式,因为它会随着数据的增长而中断。【参考方案2】:

如果您不想使用 toArray 将所有结果加载到内存中,您可以使用光标进行迭代,如下所示。

myCollection.find(, function(err, resultCursor) 
  function processItem(err, item) 
    if(item === null) 
      return; // All done!
    

    externalAsyncFunction(item, function(err) 
      resultCursor.nextObject(processItem);
    );

  

  resultCursor.nextObject(processItem);
  

【讨论】:

这种方法不适用于大型数据集。我收到“RangeError:超出最大调用堆栈大小”。 @SoichiHayashi 将异步函数或回调包装在 process.nextTick! @SoichiHayashi 跟进@zamnuts - 你的堆栈溢出上面例子的原因是因为每次你处理一个项目时,你运行另一个回调来处理下一个项目在当前的处理函数。随着结果集的增长,您会循环访问更多的函数调用,并且每个调用都会在前一个之上创建一个新的堆栈框架。将异步回调包装在 process.nextTicksetImmediatesetTimeout 中会使其在下一个循环中运行,即我们为处理每个文档而创建的调用堆栈“外部”。 cursor.forEach() 呢? @Redsandro - cursor.forEach() 没有提供异步信号方式来移动到下一个项目。【参考方案3】:

您可以使用异步库执行类似的操作。这里的关键是检查当前文档是否为空。如果是,则表示您已完成。

async.series([
        function (cb) 
            cursor.each(function (err, doc) 
                if (err) 
                    cb(err);
                 else if (doc === null) 
                    cb();
                 else 
                    console.log(doc);
                    array.push(doc);
                
            );
        
    ], function (err) 
        callback(err, array);
    );

【讨论】:

嗨 Antoine - 我使用这种方法遇到的问题是,如果您需要对每条记录异步执行某些操作,那么游标循环无法等待直到完成。 (cursor.each 不提供“下一个”回调,因此只能在其中进行同步操作)。【参考方案4】:

您可以使用简单的 setTimeOut。这是在 nodejs 上运行的 typescript 中的一个示例(我通过'when'模块使用承诺,但也可以在没有它们的情况下完成):

        import mongodb = require("mongodb");

        var dbServer = new mongodb.Server('localhost', 27017, auto_reconnect: true, );
        var db =  new mongodb.Db('myDb', dbServer);

        var util = require('util');
        var when = require('when'); //npm install when

        var dbDefer = when.defer();
        db.open(function() 
            console.log('db opened...');
            dbDefer.resolve(db);
        );

        dbDefer.promise.then(function(db : mongodb.Db)
            db.collection('myCollection', function (error, dataCol)
                if(error) 
                    console.error(error); return;
                

                var doneReading = when.defer();

                var processOneRecordAsync = function(record) : When.Promise
                    var result = when.defer();

                    setTimeout (function() 
                        //simulate a variable-length operation
                        console.log(util.inspect(record));
                        result.resolve('record processed');
                    , Math.random()*5);

                    return result.promise;
                

                var runCursor = function (cursor : MongoCursor)
                    cursor.next(function(error : any, record : any)
                        if (error)
                            console.log('an error occurred: ' + error);
                            return;
                        
                        if (record)
                            processOneRecordAsync(record).then(function(r)
                                setTimeout(function() runCursor(cursor), 1);
                            );
                        
                        else
                            //cursor up
                            doneReading.resolve('done reading data.');
                        
                    );
                

                dataCol.find(, function(error, cursor : MongoCursor)
                    if (!error)
                    
                        setTimeout(function() runCursor(cursor), 1);
                    
                );

                doneReading.promise.then(function(message : string)
                    //message='done reading data'
                    console.log(message);
                );
            );
        );

【讨论】:

【参考方案5】:

你可以使用 Future:

myCollection.find(, function(err, resultCursor) 
    resultCursor.count(Meteor.bindEnvironment(function(err,count)
        for(var i=0;i<count;i++)
        
            var itemFuture=new Future();

            resultCursor.nextObject(function(err,item))
                itemFuture.result(item);
            

            var item=itemFuture.wait();
            //do what you want with the item, 
            //and continue with the loop if so

        
    ));
);

【讨论】:

【参考方案6】:

如果有人正在寻找一种 Promise 方式来执行此操作(而不是使用 nextObject 的回调),那么就是这样。我正在使用 Node v4.2.2 和 mongo 驱动程序 v2.1.7。这是 Cursor.forEach() 的 asyncSeries 版本:

function forEachSeries(cursor, iterator) 
  return new Promise(function(resolve, reject) 
    var count = 0;
    function processDoc(doc) 
      if (doc != null) 
        count++;
        return iterator(doc).then(function() 
          return cursor.next().then(processDoc);
        );
       else 
        resolve(count);
      
    
    cursor.next().then(processDoc);
  );

要使用它,请传递光标和一个异步操作每个文档的迭代器(就像您对 Cursor.forEach 所做的那样)。迭代器需要返回一个 promise,就像大多数 mongodb 原生驱动函数一样。

假设您要更新集合test 中的所有文档。你会这样做:

var theDb;
MongoClient.connect(dbUrl).then(function(db) 
  theDb = db;     // save it, we'll need to close the connection when done.
  var cur = db.collection('test').find();

  return forEachSeries(cur, function(doc)     // this is the iterator
    return db.collection('test').updateOne(
      _id: doc._id,
      $set: updated: true       // or whatever else you need to change
    );
    // updateOne returns a promise, if not supplied a callback. Just return it.
  );
)
.then(function(count) 
  console.log("All Done. Processed", count, "records");
  theDb.close();
)

【讨论】:

我看不到forEachSeries 的调用位置。 调用栈溢出。【参考方案7】:

这适用于使用 setImmediate 处理大型数据集:

var cursor = collection.find(filter...).cursor();

cursor.nextObject(function fn(err, item) 
    if (err || !item) return;

    setImmediate(fnAction, item, arg1, arg2, function() 
        cursor.nextObject(fn);
    );
);

function fnAction(item, arg1, arg2, callback) 
    // Here you can do whatever you want to do with your item.
    return callback();

【讨论】:

这很好,但你不需要第一行的“.cursor()”(我有一个错误)。 这取决于使用的猫鼬版本。它是给一个年长的人【参考方案8】:

使用async/await 的更现代的方法:

const cursor = db.collection("foo").find();
while(await cursor.hasNext()) 
  const doc = await cursor.next();
  // process doc here

注意事项:

当async iterators 到达时,这可能更加简单。 您可能需要添加 try/catch 以进行错误检查。 包含函数应该是async,或者代码应该包裹在(async function() ... )()中,因为它使用await。 如果需要,请在 while 循环的末尾添加 await new Promise(resolve =&gt; setTimeout(resolve, 1000));(暂停 1 秒),以表明它会一个接一个地处理文档。

【讨论】:

完美运行,谢谢。知道大型数据集是否存在任何缺陷? 太好了,这是最好的解决方案,不像选择的会崩溃 如何在节点中使用它?我收到此错误:“cursor.hasNext()”处的“SyntaxError: Unexpected identifier” @Nico,抱歉回复晚了,但请参阅注释中的第 3 点;)【参考方案9】:

since node.js v10.3你可以使用异步迭代器

const cursor = db.collection('foo').find();
for await (const doc of cursor) 
  // do your thing
  // you can even use `await myAsyncOperation()` here

Jake Archibald 写了关于异步迭代器的 a great blog post,这是我在阅读 @user993683 的回答后才知道的。

【讨论】:

正在寻找这个解决方案,谢谢!

以上是关于连续迭代 mongodb 游标(在移动到下一个文档之前等待回调)的主要内容,如果未能解决你的问题,请参考以下文章

MongoDB Cursor - 计数和枚举文档的有效方法(在节点中)

mongodb3.2系统性学习——5游标 模糊查询 findAndModify函数

Python + MongoDB - 光标迭代太慢 - 未解决?

Linux中vi编辑器常用命令

vi编辑器的常用命令

mongo 进阶——查询 - 掘金