为啥 Node.js 会以这种方式执行?

Posted

技术标签:

【中文标题】为啥 Node.js 会以这种方式执行?【英文标题】:Why is Node.js executing in this manner?为什么 Node.js 会以这种方式执行? 【发布时间】:2014-08-28 12:57:57 【问题描述】:

我有一个 Node.js 应用程序,用于将记录从 mysql 迁移到 MongoDB。我正在使用 Mongoose 和 async.js 来执行此操作,并且我注意到一些我不理解的行为。如果我有以下 Coffeescript 代码 (javascript here):

           # users is a collection of about 70k records
async.each users, ((user, callback) =>
    # console.log "saving user: #user.id of #users[users.length-1].id"
    model = new User
        id: user.id
        name:
            first: user.fname
            last: user.lname
    model.save (err) ->
        console.log "saving user: #user.id"
        model = null
        callback(err)
), (err) ->
    users = null
    callback(err)

model.save 的回调永远不会到达,我的 Node 进程将慢慢爬升至 1.5gb。如果我检查我的 mongodb 实例,我可以看到在处理完 users 集合中的所有 70k 项之后,记录将开始保存到 mongodb,但它们在 41k 左右停止。

我注意到,如果我从async.each 切换到async.eachSeries,则每条记录都会调用model.save,并且迁移成功完成。

我假设出于某种原因,Node 在执行 model.save 的回调之前,会针对 users 集合中的每个项目运行 async.each 的每次迭代,这会导致内存问题,但我不明白这是为什么。谁能告诉我为什么 Node 会这样做,以及为什么切换到 async.eachSeries 可以解决这个问题?

【问题讨论】:

可能是mongodb驱动遇到了并发问题,你试过用async.queue代替each吗? 【参考方案1】:

尼尔在提供解决方案方面做得很好,但我只是想谈谈你的问题:

谁能告诉我为什么 Node 会这样做,以及为什么切换到 async.eachSeries 可以解决这个问题?

如果您查看async.eachasync.eachSeries 的详细信息,您可能会注意到async.each 的文档指出:

将函数迭代器并行应用于 arr 中的每个项目

但是,async.eachSeries 表示:

和 each 一样,只是迭代器被应用到 arr 中的每一项上。仅在当前迭代器完成后才调用下一个迭代器。这意味着迭代器函数将按顺序完成。

详细来说,如果我们查看代码,您会发现 each 的代码最终调用了数组本身的原生 forEach 函数,并且每个元素都调用了迭代器 (link to source):

_each(arr, function (x) 
    iterator(x, only_once(done) );
);

调用:

var _each = function (arr, iterator) 
    if (arr.forEach) 
        return arr.forEach(iterator);
    

但是,对迭代器函数的每次调用最终都会调用model.save。这个 Mongoose 函数(除其他外)最终会执行 I/O 以将数据保存到数据库中。如果您要跟踪代码路径,您会看到它最终出现在一个调用 process.nextTick (link to source) 的函数中。

Node 的process.nextTick 函数通常用于这种情况(I/O),一旦执行流程结束就会处理回调。在这种情况下,只有在 forEach 循环完成后才会调用每个回调。 (这是有目的的,并且不会阻止任何代码执行。)

总结一下:

当使用async.each 时,您上面的代码将遍历所有用户,将保存排队,但只有在代码完成对所有用户的迭代后才开始处理它们。

当使用async.eachSeries 时,您上面的代码将一次处理每个用户,并且仅在保存完成后才处理下一个用户——当调用 eachSeries 回调时。

【讨论】:

精彩、清晰的答案。感谢您分解它!【参考方案2】:

那么,将厨房水槽扔到您的流程中肯定会出现问题。它本质上是在做你要求它做的事情,因此试图一次异步地“启动”所有这些“保存”操作。基本现实是,您只能处理这么多与 MongoDB 的连接,因此当您执行此操作时,某处会出现瓶颈。

如果您实际上不需要以明确的顺序完成操作,则比在“系列”中执行此操作更好的方法是对您排队的操作数量使用“限制”。有async.eachLimit() 可以做到这一点。

这里的调用约定似乎有点奇怪,所以至少对我来说这似乎更清晰:

async.eachLimit(users,500,function(user,callback)
    var model = new Model(
        id: user.id,
        name: 
            first: user.fname,
            last: user.lname
        
    );
    model.save(function(err, model) 
        console.log("saving user: " + model.id);
        callback(err);
    );
, function(err) 
    if (err) 
        console.log("there was a problem");
     else 
        console.log("all successful");
    
);

或作为基本翻译的咖啡脚本:

async.eachLimit users, 500, ((user, callback) ->
  model = new Model(
    id: user.id
    name:
      first: user.fname
      last: user.lname
  )
  model.save (err, model) ->
    console.log "saving user: " + model.id
    callback err
    return

  return
), (err) ->
  if err
    console.log "there was a problem"
  else
    console.log "all successful"
  return

最后的回调将在所有回调返回后处理,但你是在“限制”你向 mongoose 甚至 MongoDB 抛出的内容。

您可能还想查看 MongoDB 的 Bulk Operations API,除非您明确需要使用模型中的“验证”功能或其他功能。这实质上允许您一次发送“一批”插入,而不是“一次一个”地将每个文档发送到数据库。

这里是人为的例子,使用 eachSeries 但实际的“写入”是分组的:

var async = require("async"),
    mongoose = require("mongoose"),
    Schema = mongoose.Schema;

mongoose.connect('mongodb://localhost/test');

var tenSchema = new Schema(
  value: Number
);

var Ten = mongoose.model( "Ten", tenSchema, "ten" );

var ten = [1,2,4,5,6,7,8,9,10];
var pos = 0;

mongoose.connection.on("open",function(err,conn) 

  var bulk = Ten.collection.initializeOrderedBulkOp();

  async.eachSeries(ten,function(item,callback) 

    bulk.insert( "value": item );
    pos++;

    if ( pos % 2 == 0 ) 
      bulk.execute(function(err,res) 
        pos = 0;
        bulk = Ten.collection.initializeOrderedBulkOp();
        callback(err);
      );
     else 
      callback();
    

  ,function(err) 

    if (err)
      throw err;

    if ( pos != 0 ) 
      bulk.execute(function(err,result) 
        console.log("done");
      );
     else 
      console.log("done");
    

  );

);

因此,在您的情况下,只需“向上”计算模数的值,例如 500,这将处理数组,但每 500 个项目只写入一次数据库。

唯一需要注意的是这是一个本机驱动程序函数,而不是使用 mongoose API。因此,您需要小心(在迁移脚本或类似的情况下)以确保在引用这些方法之前建立当前连接。这里人为的方法是寻找“开放”,但基本上你只是想确定,通常通过其他方式。

您可能会更喜欢并行“批量写入”队列,但总体性能应该比任何其他方法更好,而无需进一步研究。

【讨论】:

出色的答案,非常感谢!希望我能接受你和 dylant 的回答。 @AbeMiessler 够公平的。我主要是为了提供替代方案来简单地处理密集的 I/O 串行。但另一个是一个公平的解释,.each() 只是在无限制地排队工作。所以.eachSeries() 会避免这种情况,.eachLimit() 将并发操作减少到工作数量,批量更新减少 I/O 开销。

以上是关于为啥 Node.js 会以这种方式执行?的主要内容,如果未能解决你的问题,请参考以下文章

为啥Node Js mongoose文档删除不等待不执行?

为啥 node.js 配置文件的大小会是数 GB? webstorm 处理的方式

为啥 Node.js 的 Mysql Native 驱动程序的查询执行时间如此之长?有啥选择吗?

为啥 Node.js 没有原生 DOM?

我应该 node.js 监听哪些端口?如何以及为啥?

为啥我的 javascript (node.js) 给了我不正确的时间戳?