在 NodeJS 中将 200'000 行以上的大型 csv 文件插入 MongoDB

Posted

技术标签:

【中文标题】在 NodeJS 中将 200\'000 行以上的大型 csv 文件插入 MongoDB【英文标题】:Insert a large csv file, 200'000 rows+, into MongoDB in NodeJS在 NodeJS 中将 200'000 行以上的大型 csv 文件插入 MongoDB 【发布时间】:2017-06-11 00:20:37 【问题描述】:

我正在尝试解析一个大的 csv 文件并将其插入 MongoDB,但是当文件扩展 100'000 行时,我得到了来自服务器的错误响应。而且我需要插入的文件通常在 200'000 行以上。

我已经尝试过批量插入 (insertMany) 和 Babyparse(Papaparse) 流式方法来逐行插入文件。但效果不佳。

节点接口:

router.post('/csv-upload/:id', multipartMiddleware, function(req, res) 

    // Post vartiables
    var fileId = req.params.id;
    var csv = req.files.files.path;

    // create a queue object with concurrency 5
    var q = async.queue(function(row, callback) 
        var entry = new Entry(row);
        entry.save();
        callback();
    , 5);

    baby.parseFiles(csv, 
        header: true, // Includes header in JSON
        skipEmptyLines: true,
        fastMode: true,
        step: function(results, parser) 
            results.data[0].id = fileId;

            q.push(results.data[0], function (err) 
                if (err) throw err;
            );
        ,
        complete: function(results, file) 
            console.log("Parsing complete:", results, file);
            q.drain = function() 
                console.log('All items have been processed');
                res.send("Completed!");
            ;
        
    );
);

这种流式处理方法会导致:POST SERVER net::ERR_EMPTY_RESPONSE

不确定我是否正确使用了 async.queue。

有没有更好、更有效的方法来做到这一点,或者我做错了什么?

快递服务器:

// Dependencies
var express = require('express');
var path = require('path');
var bodyParser = require('body-parser');
var routes = require('./server/routes');
var mongoose = require("mongoose");
var babel = require("babel-core/register");
var compression = require('compression');
var PORT = process.env.PORT || 3000;
// Include the cluster module
var cluster = require('cluster');

mongoose.connect(process.env.MONGOLAB_URI || 'mongodb://localhost/routes');

  // Code to run if we're in the master process
 if (cluster.isMaster) 

    // Count the machine's CPUs
    var cpuCount = require('os').cpus().length;

    // Create a worker for each CPU
    for (var i = 0; i < cpuCount; i += 1) 
        cluster.fork();
    

 // Code to run if we're in a worker process
  else 
    // Express
    var app = express();

    app.use(bodyParser.json(limit: '50mb'));
    app.use(bodyParser.urlencoded(limit: '50mb', extended: true));

    // Compress responses
    app.use(compression());

    // Used for production build
    app.use(express.static(path.join(__dirname, 'public')));

    routes(app);

    // Routes
    app.use('/api', require('./server/routes/api'));

    app.all('/*', function(req, res) 
        res.sendFile(path.join(__dirname, 'public/index.html'));
    );

    // Start server
    app.listen(PORT, function() 
        console.log('Server ' + cluster.worker.id + ' running on ' + PORT);
    );

【问题讨论】:

我为另一个问题所做的这个解决方案可能对你有帮助 - ***.com/questions/32386118/… - OP 报告在大约 1 分钟内从 csv 加载 700k 记录 谢谢@Robbie!我会看看那个。 那个帖子帮了我很多@Robbie,谢谢。 很高兴它帮助了你 【参考方案1】:

处理导入:

很好的问题,根据我的经验,将 csv 插入 mongo 的最快方法是通过命令行:

mongoimport -d db_name -c collection_name --type csv --file file.csv --headerline 

我不相信 mongoose 有调用 mongoimport 的方法(如果我错了,请有人纠正我)

但是直接通过node调用就很简单了:

var exec = require('child_process').exec;
var cmd = 'mongoimport -d db_name -c collection_name --type csv --file file.csv --headerline';

exec(cmd, function(error, stdout, stderr) 
  // do whatever you need during the callback
);

以上内容必须修改为动态的,但应该是不言自明的。

处理上传:

从前端客户端上传文件是另一个挑战。

如果您向服务器发出请求并且在 60 秒内没有得到响应(可能是您在上面提到的),大多数浏览器都会超时

一种解决方案是打开一个套接字连接(在 npm 中搜索 socket.io)以获取详细信息。这将创建与服务器的持续连接,并且不受超时限制。

如果上传不是问题,并且超时是由于解析/插入速度慢,那么一旦您实施了上述操作,您就不必担心这一点。

其他注意事项:

我不确定您需要向用户发送回什么,或者需要进行什么解析。但这既可以在正常的请求/响应周期之外完成,也可以在一个请求/响应周期中需要时在套接字连接期间进行处理。

【讨论】:

感谢您的回复。听起来很有趣。现在上传文件不是问题。您是否知道 mongoimport 将如何处理 csv,因为该文件将始终如下所示: version task concept price 1 1 3 1 1 1 2 2 1 1 1 1 etc etc etc etc ... 要查看它如何处理导入,我将使用示例 csv 进行测试。将 csv 放在您的服务器上并运行上面提到的 mongoimport 命令,并引用您的示例 csv。然后通过命令行打开mongo,查看插入的数据。此外,mongo 文档是一个很好的参考 docs.mongodb.com/manual/reference/program/mongoimport 谢谢。这种方法的另一个问题是我需要动态添加 id,因为它不包含在文件中。 你是如何使用id的?每个 id 是不同的集合,还是 id 是您在 csv.xml 中添加的字段。如果是前者,则使用 id 作为集合名称,如果是第二个选项,您可以在 mongoimport exec 命令的回调中进行简单的 mongoose 更新。

以上是关于在 NodeJS 中将 200'000 行以上的大型 csv 文件插入 MongoDB的主要内容,如果未能解决你的问题,请参考以下文章

在NodeJS中将许多文件中的JSON对象插入MongoDB的最有效方法

如何使用 ag-grid 导出到具有 Angular 6 的大数据(50,000 行)的 excel 功能(内存不足错误)?

在mysql Sequelize + Nodejs中将数组和对象转换为批量插入

pandas 中的大而持久的 DataFrame

BigQuery 行限制

程序不适用于 C 中的大文件