如何使用 Node.js 在 MongoDB 中使用 cursor.forEach()?

Posted

技术标签:

【中文标题】如何使用 Node.js 在 MongoDB 中使用 cursor.forEach()?【英文标题】:How can I use a cursor.forEach() in MongoDB using Node.js? 【发布时间】:2014-10-19 21:35:24 【问题描述】:

我的数据库中有大量文档,我想知道如何遍历所有文档并更新它们,每个文档都有不同的值。

【问题讨论】:

这取决于您用于连接 MongoDB 的驱动程序。 我使用的是mongodb驱动 你能给我一些关于 forEach() 内部更新的例子,并指定你在哪里关闭与数据库的连接,因为我遇到了问题 【参考方案1】:

我寻找了一个性能良好的解决方案,我最终创建了一个我认为效果很好的组合:

/**
 * This method will read the documents from the cursor in batches and invoke the callback
 * for each batch in parallel.
 * IT IS VERY RECOMMENDED TO CREATE THE CURSOR TO AN OPTION OF BATCH SIZE THAT WILL MATCH
 * THE VALUE OF batchSize. This way the performance benefits are maxed out since
 * the mongo instance will send into our process memory the same number of documents
 * that we handle in concurrent each time, so no memory space is wasted
 * and also the memory usage is limited.
 *
 * Example of usage:
 * const cursor = await collection.aggregate([
     ..., ...],
     
        cursor: batchSize: BATCH_SIZE // Limiting memory use
    );
 DbUtil.concurrentCursorBatchProcessing(cursor, BATCH_SIZE, async (doc) => ...)
 * @param cursor - A cursor to batch process on.
 * We can get this from our collection.js API by either using aggregateCursor/findCursor
 * @param batchSize - The batch size, should match the batchSize of the cursor option.
 * @param callback - Callback that should be async, will be called in parallel for each batch.
 * @return Promise<void>
 */
static async concurrentCursorBatchProcessing(cursor, batchSize, callback) 
    let doc;
    const docsBatch = [];

    while ((doc = await cursor.next())) 
        docsBatch.push(doc);

        if (docsBatch.length >= batchSize) 
            await PromiseUtils.concurrentPromiseAll(docsBatch, async (currDoc) => 
                return callback(currDoc);
            );

            // Emptying the batch array
            docsBatch.splice(0, docsBatch.length);
        
    

    // Checking if there is a last batch remaining since it was small than batchSize
    if (docsBatch.length > 0) 
        await PromiseUtils.concurrentPromiseAll(docsBatch, async (currDoc) => 
            return callback(currDoc);
        );
    

读取大量大型文档并更新它们的示例:

        const cursor = await collection.aggregate([
        
            ...
        
    ], 
        cursor: batchSize: BATCH_SIZE, // Limiting memory use 
        allowDiskUse: true
    );

    const bulkUpdates = [];

    await DbUtil.concurrentCursorBatchProcessing(cursor, BATCH_SIZE, async (doc: any) => 
        const update: any = 
            updateOne: 
                filter: 
                    ...
                ,
                update: 
                   ...
                
            
        ;            

        bulkUpdates.push(update);

        // Updating if we read too many docs to clear space in memory
        await this.bulkWriteIfNeeded(bulkUpdates, collection);
    );

    // Making sure we updated everything
    await this.bulkWriteIfNeeded(bulkUpdates, collection, true);

...

    private async bulkWriteParametersIfNeeded(
    bulkUpdates: any[], collection: any,
    forceUpdate = false, flushBatchSize) 

    if (bulkUpdates.length >= flushBatchSize || forceUpdate) 
        // concurrentPromiseChunked is a method that loops over an array in a concurrent way using lodash.chunk and Promise.map
        await PromiseUtils.concurrentPromiseChunked(bulkUpsertParameters, (upsertChunk: any) => 
            return techniquesParametersCollection.bulkWrite(upsertChunk);
        );

        // Emptying the array
        bulkUpsertParameters.splice(0, bulkUpsertParameters.length);
    

【讨论】:

【参考方案2】: var MongoClient = require('mongodb').MongoClient, assert = require('assert'); MongoClient.connect('mongodb://localhost:27017/crunchbase', function(err, db) assert.equal(err, null); console.log("Successfully connected to MongoDB."); var query = "category_code": "biotech" ; db.collection('companies').find(query).toArray(function(err, docs) assert.equal(err, null); assert.notEqual(docs.length, 0); docs.forEach(function(doc) console.log(doc.name + " is a " + doc.category_code + " company."); ); db.close(); ); );

请注意,调用 .toArray 正在使应用程序获取整个数据集。

var MongoClient = require('mongodb').MongoClient, assert = require('assert'); MongoClient.connect('mongodb://localhost:27017/crunchbase', function(err, db) assert.equal(err, null); console.log("Successfully connected to MongoDB."); var query = "category_code": "biotech" ; var cursor = db.collection('companies').find(query); function(doc) cursor.forEach( console.log(doc.name + " is a " + doc.category_code + " company."); , function(err) assert.equal(err, null); return db.close(); ); );

注意find() 返回的光标 被分配给var cursor。使用这种方法,我们不是一次获取内存中的所有数据并使用数据,而是将数据流式传输到我们的应用程序。 find() 可以立即创建一个游标,因为它实际上不会向数据库发出请求,直到我们尝试使用它将提供的一些文档。 cursor 的重点是描述我们的查询。 cursor.forEach 的第二个参数显示了当驱动程序耗尽或发生错误时该怎么做。

在上述代码的初始版本中,强制数据库调用的是toArray()。这意味着我们需要所有文档并希望它们位于array 中。

另外,MongoDB 以批处理格式返回数据。下图显示,来自游标(来自应用程序)的请求到MongoDB

forEachtoArray 更好,因为我们可以在文档进入时处理,直到到达最后。将其与toArray 进行对比 - 我们等待 ALL 检索文档并构建 entire 数组。这意味着我们没有从驱动程序和数据库系统协同工作以将结果批处理到您的应用程序这一事实中获得任何优势。批处理旨在提高内存开销和执行时间方面的效率。 如果可以的话,在你的应用程序中利用它

【讨论】:

游标在客户端处理数据时会导致额外的问题。由于系统的整个异步特性,数组更容易使用。【参考方案3】:

您现在可以使用(当然是在异步函数中):

for await (let doc of collection.find(query)) 
  await updateDoc(doc);


// all done

很好地序列化所有更新。

【讨论】:

doc 可以是const,因为它的范围在循环内。【参考方案4】:

假设我们有以下 MongoDB 数据。

Database name: users
Collection name: jobs
===========================
Documents
 "_id" : ObjectId("1"), "job" : "Security", "name" : "Jack", "age" : 35 
 "_id" : ObjectId("2"), "job" : "Development", "name" : "Tito" 
 "_id" : ObjectId("3"), "job" : "Design", "name" : "Ben", "age" : 45
 "_id" : ObjectId("4"), "job" : "Programming", "name" : "John", "age" : 25 
 "_id" : ObjectId("5"), "job" : "IT", "name" : "ricko", "age" : 45 
==========================

这段代码:

var MongoClient = require('mongodb').MongoClient;
var dbURL = 'mongodb://localhost/users';

MongoClient.connect(dbURL, (err, db) => 
    if (err) 
        throw err;
     else 
        console.log('Connection successful');
        var dataBase = db.db();
        // loop forEach
        dataBase.collection('jobs').find().forEach(function(myDoc)
        console.log('There is a job called :'+ myDoc.job +'in Database'))
);

【讨论】:

【参考方案5】:

之前的答案都没有提到批量更新。这使得它们非常慢 ? - 比使用 bulkWrite 的解决方案慢几十或几百倍。

假设您想将每个文档中某个字段的值加倍。以下是如何在固定内存消耗的情况下快速完成?:

// Double the value of the 'foo' field in all documents
let bulkWrites = [];
const bulkDocumentsSize = 100;  // how many documents to write at once
let i = 0;
db.collection.find( ... ).forEach(doc => 
  i++;

  // Update the document...
  doc.foo = doc.foo * 2;

  // Add the update to an array of bulk operations to execute later
  bulkWrites.push(
    replaceOne: 
      filter:  _id: doc._id ,
      replacement: doc,
    ,
  );

  // Update the documents and log progress every `bulkDocumentsSize` documents
  if (i % bulkDocumentsSize === 0) 
    db.collection.bulkWrite(bulkWrites);
    bulkWrites = [];
    print(`Updated $i documents`);
  
);
// Flush the last <100 bulk writes
db.collection.bulkWrite(bulkWrites);

【讨论】:

【参考方案6】:

使用mongodb 驱动程序和带有async/await 的现代NodeJS,一个好的解决方案是使用next()

const collection = db.collection('things')
const cursor = collection.find(
  bla: 42 // find all things where bla is 42
);
let document;
while ((document = await cursor.next())) 
  await collection.findOneAndUpdate(
    _id: document._id
  , 
    $set: 
      blu: 43
    
  );

这导致内存中一次只需要一个文档,而不是例如接受的答案,在开始处理文档之前,许多文档被吸入内存。在“大量收藏”的情况下(根据问题),这可能很重要。

如果文档很大,可以使用projection 进一步改进,以便只从数据库中获取那些需要的文档字段。

【讨论】:

@ZachSmith:这是正确的,但速度很慢。使用bulkWrite可以加速10倍或更多。 有hasNext 方法。并且应该使用它而不是while(document=....) 模式 你为什么说这只会将 1 个文档加载到内存中?您不需要添加到 find call tge batchSize 选项并为其分配值 1 吗? @ZivGlazer 这不是我要说的:提供的代码需要在任何时候都只需要一个文档在内存中。您所指的批量大小决定了在一个请求中从数据库中获取了多少文档,并决定了有多少文档将随时驻留在内存中,因此您在这个意义上是对的;但是批量大小为 1 可能不是一个好主意......找到一个好的批量大小是一个不同的问题:) 我同意,我想做一个类似的过程,阅读一个包含大文档的大集合并更新我阅读的每一个,我最终实现了这样的东西:【参考方案7】:

答案取决于您使用的驱动程序。我认识的所有 MongoDB 驱动程序都以一种或另一种方式实现了 cursor.forEach()

这里有一些例子:

node-mongodb-native

collection.find(query).forEach(function(doc) 
  // handle
, function(err) 
  // done or error
);

mongojs

db.collection.find(query).forEach(function(err, doc) 
  // handle
);

monk

collection.find(query,  stream: true )
  .each(function(doc)
    // handle doc
  )
  .error(function(err)
    // handle error
  )
  .success(function()
    // final callback
  );

mongoose

collection.find(query).stream()
  .on('data', function(doc)
    // handle doc
  )
  .on('error', function(err)
    // handle error
  )
  .on('end', function()
    // final callback
  );

.forEach 回调中更新文档

.forEach 回调中更新文档的唯一问题是您不知道所有文档何时更新。

要解决这个问题,您应该使用一些异步控制流解决方案。以下是一些选项:

async 承诺(when.js,bluebird)

这里是一个使用async的例子,使用它的queue feature:

var q = async.queue(function (doc, callback) 
  // code for your update
  collection.update(
    _id: doc._id
  , 
    $set: hi: 'there'
  , 
    w: 1
  , callback);
, Infinity);

var cursor = collection.find(query);
cursor.each(function(err, doc) 
  if (err) throw err;
  if (doc) q.push(doc); // dispatching doc to async.queue
);

q.drain = function() 
  if (cursor.isClosed()) 
    console.log('all items have been processed');
    db.close();
  

【讨论】:

我可以在里面使用一个更新函数来获取 doc._id 作为来自 for each 的查询吗? 是的,你可以。但您需要等待所有更新操作完成后才能关闭连接。 你能添加一个 Promise 示例来迭代 .forEach 吗? 值得一提的是,现在 node-mongodb-native 有一个名为 forEach 的 API,它接受 callback。 对 mongoose 的一个小提示 - .stream 方法已被弃用,现在我们应该使用 .cursor【参考方案8】:

node-mongodb-native现在支持endCallback参数到cursor.forEach,用于处理整个迭代后的事件,详细参考官方文档http://mongodb.github.io/node-mongodb-native/2.2/api/Cursor.html#forEach。

另请注意,.each 现在在 nodejs 原生驱动程序中已弃用。

【讨论】:

您能否提供一个在处理完所有结果后触发回调的示例? @chovy in forEach(iteratorCallback, endCallback) endCallback(error) 在没有更多数据时调用(error 未定义)。【参考方案9】:

下面是一个使用 Mongoose 游标与 promise 异步的示例:

new Promise(function (resolve, reject) 
  collection.find(query).cursor()
    .on('data', function(doc) 
      // ...
    )
    .on('error', reject)
    .on('end', resolve);
)
.then(function () 
  // ...
);

参考:

Mongoose cursors Streams and promises

【讨论】:

如果您有一个相当大的文档列表,这不会耗尽内存吗? (即:10M 文档) @chovy 理论上不应该,这就是为什么你首先使用游标而不是加载数组中的所有内容。只有在游标结束或出错后,promise 才会兑现。如果您确实有这样的数据库,那么测试它应该不会太难,我自己会很好奇。 您可以使用限制来避免获得 10M 文档,我不知道有任何人可以一次阅读 10M 文档,或者可以在一页上直观呈现它们的屏幕(移动设备、笔记本电脑或桌面 - 忘记手表【参考方案10】:

Leonid's answer 很棒,但我想强调使用 async/promises 的重要性,并通过 Promise 示例提供不同的解决方案。

解决这个问题最简单的方法是循环 forEach 文档并调用更新。通常,您是don't need close the db connection after each request,但如果您确实需要关闭连接,请小心。如果您确定所有更新已完成执行,则必须关闭它。

这里的一个常见错误是在调度所有更新后调用db.close(),而不知道它们是否已完成。如果你这样做,你会得到错误。

错误的实现

collection.find(query).each(function(err, doc) 
  if (err) throw err;

  if (doc) 
    collection.update(query, update, function(err, updated) 
      // handle
    );
   
  else 
    db.close(); // if there is any pending update, it will throw an error there
  
);

然而,由于db.close() 也是一个异步操作(its signature 有一个回调选项),你可能很幸运,这段代码可以顺利完成。它可能仅在您只需要更新一个小集合中的几个文档时才有效(所以,不要尝试)。

正确的解决方案:

由于Leonid 已经提出了一个异步解决方案,下面遵循使用Q promises 的解决方案。

var Q = require('q');
var client = require('mongodb').MongoClient;

var url = 'mongodb://localhost:27017/test';

client.connect(url, function(err, db) 
  if (err) throw err;

  var promises = [];
  var query = ; // select all docs
  var collection = db.collection('demo');
  var cursor = collection.find(query);

  // read all docs
  cursor.each(function(err, doc) 
    if (err) throw err;

    if (doc) 

      // create a promise to update the doc
      var query = doc;
      var update =  $set: hi: 'there' ;

      var promise = 
        Q.npost(collection, 'update', [query, update])
        .then(function(updated) 
          console.log('Updated: ' + updated); 
        );

      promises.push(promise);
     else 

      // close the connection after executing all promises
      Q.all(promises)
      .then(function() 
        if (cursor.isClosed()) 
          console.log('all items have been processed');
          db.close();
        
      )
      .fail(console.error);
    
  );
);

【讨论】:

当我尝试这个时,我收到错误 doc is not defined 的行 var promises = calls.map(myUpdateFunction(doc)); @robocode,你!我已经修复了错误,示例现在运行良好。 这种方法要求您在处理结果之前将整个数据集加载到内存中。对于大型数据集,这不是一种实用的方法(您将耗尽内存)。你也可以只用我们 .toArray(),它更简单(当然也需要加载整个数据集)。 @UpTheCreek,感谢您的反馈。我已经更新了我的答案以仅存储 promise 对象而不是 doc 对象。它使用的内存要少得多,因为 promises 对象是一个存储状态的小 JSON。 @UpTheCreek,为了获得更好的内存使用解决方案,您需要执行两次查询:第一次使用“计数”,第二次获取光标。每次更新完成后,您将增加一个变量,并且仅在此变量达到结果总数后才停止程序(并关闭数据库连接)。

以上是关于如何使用 Node.js 在 MongoDB 中使用 cursor.forEach()?的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 Node.js 在 MongoDB 中使用 cursor.forEach()?

如何使用 mongodb (node.js) 在集合中创建一个包含所有值的数组

如何使用node.js中的循环将表单数据保存为对象和对象数组到mongodb?

如何在 Node.js 中使用 mongoose 将大数组保存到 MongoDB

node.js如何配置mongodb连接池?

如何使用 mongoose 将本地 mongodb 与 node.js 应用程序连接?