在 NodeJS 中将 200'000 行以上的大型 csv 文件插入 MongoDB
Posted
技术标签:
【中文标题】在 NodeJS 中将 200\'000 行以上的大型 csv 文件插入 MongoDB【英文标题】:Insert a large csv file, 200'000 rows+, into MongoDB in NodeJS在 NodeJS 中将 200'000 行以上的大型 csv 文件插入 MongoDB 【发布时间】:2017-06-11 00:20:37 【问题描述】:我正在尝试解析一个大的 csv 文件并将其插入 MongoDB,但是当文件扩展 100'000 行时,我得到了来自服务器的错误响应。而且我需要插入的文件通常在 200'000 行以上。
我已经尝试过批量插入 (insertMany) 和 Babyparse(Papaparse) 流式方法来逐行插入文件。但效果不佳。
节点接口:
router.post('/csv-upload/:id', multipartMiddleware, function(req, res)
// Post vartiables
var fileId = req.params.id;
var csv = req.files.files.path;
// create a queue object with concurrency 5
var q = async.queue(function(row, callback)
var entry = new Entry(row);
entry.save();
callback();
, 5);
baby.parseFiles(csv,
header: true, // Includes header in JSON
skipEmptyLines: true,
fastMode: true,
step: function(results, parser)
results.data[0].id = fileId;
q.push(results.data[0], function (err)
if (err) throw err;
);
,
complete: function(results, file)
console.log("Parsing complete:", results, file);
q.drain = function()
console.log('All items have been processed');
res.send("Completed!");
;
);
);
这种流式处理方法会导致:POST SERVER net::ERR_EMPTY_RESPONSE
不确定我是否正确使用了 async.queue。
有没有更好、更有效的方法来做到这一点,或者我做错了什么?
快递服务器:
// Dependencies
var express = require('express');
var path = require('path');
var bodyParser = require('body-parser');
var routes = require('./server/routes');
var mongoose = require("mongoose");
var babel = require("babel-core/register");
var compression = require('compression');
var PORT = process.env.PORT || 3000;
// Include the cluster module
var cluster = require('cluster');
mongoose.connect(process.env.MONGOLAB_URI || 'mongodb://localhost/routes');
// Code to run if we're in the master process
if (cluster.isMaster)
// Count the machine's CPUs
var cpuCount = require('os').cpus().length;
// Create a worker for each CPU
for (var i = 0; i < cpuCount; i += 1)
cluster.fork();
// Code to run if we're in a worker process
else
// Express
var app = express();
app.use(bodyParser.json(limit: '50mb'));
app.use(bodyParser.urlencoded(limit: '50mb', extended: true));
// Compress responses
app.use(compression());
// Used for production build
app.use(express.static(path.join(__dirname, 'public')));
routes(app);
// Routes
app.use('/api', require('./server/routes/api'));
app.all('/*', function(req, res)
res.sendFile(path.join(__dirname, 'public/index.html'));
);
// Start server
app.listen(PORT, function()
console.log('Server ' + cluster.worker.id + ' running on ' + PORT);
);
【问题讨论】:
我为另一个问题所做的这个解决方案可能对你有帮助 - ***.com/questions/32386118/… - OP 报告在大约 1 分钟内从 csv 加载 700k 记录 谢谢@Robbie!我会看看那个。 那个帖子帮了我很多@Robbie,谢谢。 很高兴它帮助了你 【参考方案1】:处理导入:
很好的问题,根据我的经验,将 csv 插入 mongo 的最快方法是通过命令行:
mongoimport -d db_name -c collection_name --type csv --file file.csv --headerline
我不相信 mongoose 有调用 mongoimport 的方法(如果我错了,请有人纠正我)
但是直接通过node调用就很简单了:
var exec = require('child_process').exec;
var cmd = 'mongoimport -d db_name -c collection_name --type csv --file file.csv --headerline';
exec(cmd, function(error, stdout, stderr)
// do whatever you need during the callback
);
以上内容必须修改为动态的,但应该是不言自明的。
处理上传:
从前端客户端上传文件是另一个挑战。
如果您向服务器发出请求并且在 60 秒内没有得到响应(可能是您在上面提到的),大多数浏览器都会超时
一种解决方案是打开一个套接字连接(在 npm 中搜索 socket.io)以获取详细信息。这将创建与服务器的持续连接,并且不受超时限制。
如果上传不是问题,并且超时是由于解析/插入速度慢,那么一旦您实施了上述操作,您就不必担心这一点。
其他注意事项:
我不确定您需要向用户发送回什么,或者需要进行什么解析。但这既可以在正常的请求/响应周期之外完成,也可以在一个请求/响应周期中需要时在套接字连接期间进行处理。
【讨论】:
感谢您的回复。听起来很有趣。现在上传文件不是问题。您是否知道 mongoimport 将如何处理 csv,因为该文件将始终如下所示: version task concept price 1 1 3 1 1 1 2 2 1 1 1 1 etc etc etc etc ... 要查看它如何处理导入,我将使用示例 csv 进行测试。将 csv 放在您的服务器上并运行上面提到的 mongoimport 命令,并引用您的示例 csv。然后通过命令行打开mongo,查看插入的数据。此外,mongo 文档是一个很好的参考 docs.mongodb.com/manual/reference/program/mongoimport 谢谢。这种方法的另一个问题是我需要动态添加 id,因为它不包含在文件中。 你是如何使用id的?每个 id 是不同的集合,还是 id 是您在 csv.xml 中添加的字段。如果是前者,则使用 id 作为集合名称,如果是第二个选项,您可以在 mongoimport exec 命令的回调中进行简单的 mongoose 更新。以上是关于在 NodeJS 中将 200'000 行以上的大型 csv 文件插入 MongoDB的主要内容,如果未能解决你的问题,请参考以下文章
在NodeJS中将许多文件中的JSON对象插入MongoDB的最有效方法
如何使用 ag-grid 导出到具有 Angular 6 的大数据(50,000 行)的 excel 功能(内存不足错误)?