在 Node.js 中解析巨大的日志文件 - 逐行读取
Posted
技术标签:
【中文标题】在 Node.js 中解析巨大的日志文件 - 逐行读取【英文标题】:Parsing huge logfiles in Node.js - read in line-by-line 【发布时间】:2013-04-07 07:34:34 【问题描述】:我需要对 javascript/Node.js 中的大型(5-10 Gb)日志文件进行一些解析(我正在使用 Cube)。
日志如下所示:
10:00:43.343423 I'm a friendly log message. There are 5 cats, and 7 dogs. We are in state "SUCCESS".
我们需要读取每一行,做一些解析(例如去除5
、7
和SUCCESS
),然后使用他们的JS客户端将这些数据泵入Cube(https://github.com/square/cube)。
首先,Node 中逐行读取文件的规范方式是什么?
这似乎是网上相当普遍的问题:
http://www.quora.com/What-is-the-best-way-to-read-a-file-line-by-line-in-node-js Read a file one line at a time in node.js?很多答案似乎都指向一堆第三方模块:
https://github.com/nickewing/line-reader https://github.com/jahewson/node-byline https://github.com/pkrumins/node-lazy https://github.com/Gagle/Node-BufferedReader但是,这似乎是一项相当基本的任务 - 当然,在 stdlib 中有一种简单的方法可以逐行读取文本文件?
其次,我需要处理每一行(例如,将时间戳转换为 Date 对象,并提取有用的字段)。
如何做到这一点,最大限度地提高吞吐量?是否有某种方式不会阻止读取每一行或将其发送到 Cube?
第三 - 我猜想使用字符串拆分,JS 等价于 contains (IndexOf != -1?) 会比正则表达式快很多?有没有人在 Node.js 中解析大量文本数据方面有丰富的经验?
干杯, 维克多
【问题讨论】:
我在节点中构建了一个日志解析器,它接收一堆内置“捕获”的正则表达式字符串并输出到 JSON。如果您想进行计算,您甚至可以在每次捕获时调用函数。它可能会做你想做的事:npmjs.org/package/logax 更好的比较betterprogramming.pub/… 【参考方案1】:我搜索了一种使用流逐行解析超大文件 (gbs) 的解决方案。所有第三方库和示例都不适合我的需要,因为它们不是逐行处理文件(如 1 、 2 、 3 、 4 ..)或将整个文件读入内存
以下解决方案可以使用流和管道逐行解析非常大的文件。为了测试,我使用了一个包含 17.000.000 条记录的 2.1 gb 文件。 Ram 使用量不超过 60 mb。
首先,安装event-stream 包:
npm install event-stream
然后:
var fs = require('fs')
, es = require('event-stream');
var lineNr = 0;
var s = fs.createReadStream('very-large-file.csv')
.pipe(es.split())
.pipe(es.mapSync(function(line)
// pause the readstream
s.pause();
lineNr += 1;
// process line here and call s.resume() when rdy
// function below was for logging memory usage
logMemoryUsage(lineNr);
// resume the readstream, possibly from a callback
s.resume();
)
.on('error', function(err)
console.log('Error while reading file.', err);
)
.on('end', function()
console.log('Read entire file.')
)
);
请告诉我进展如何!
【讨论】:
仅供参考,此代码不是同步的。它是异步的。如果在代码的最后一行之后插入console.log(lineNr)
,它将不会显示最终的行数,因为文件是异步读取的。
谢谢,这是我能找到的唯一解决方案,它实际上在应该暂停和恢复的时候。 Readline 没有。
很棒的例子,它确实暂停了。此外,如果您决定提前停止读取文件,您可以使用s.end();
工作就像一个魅力。用它来索引 1.5 亿个文档到 elasticsearch 索引。 readline
模块很痛苦。它不会暂停,并且每次在 40-50 百万之后都会导致失败。浪费了一天。非常感谢您的回答。这个效果很好
事件流被入侵:medium.com/intrinsic/… 但 4+ 显然是安全的 blog.npmjs.org/post/180565383195/…【参考方案2】:
您可以使用内置的readline
包,请参阅文档here。我使用stream 创建一个新的输出流。
var fs = require('fs'),
readline = require('readline'),
stream = require('stream');
var instream = fs.createReadStream('/path/to/file');
var outstream = new stream;
outstream.readable = true;
outstream.writable = true;
var rl = readline.createInterface(
input: instream,
output: outstream,
terminal: false
);
rl.on('line', function(line)
console.log(line);
//Do your stuff ...
//Then write to outstream
rl.write(cubestuff);
);
处理大型文件需要一些时间。告诉它是否有效。
【讨论】:
如所写,倒数第二行失败,因为未定义立方体。 使用readline
,是否可以暂停/恢复读取流以在“do stuff”区域执行异步操作?
@jchook readline
在我尝试暂停/恢复时给了我很多问题。如果下游进程较慢,它不会正确暂停流,从而产生很多问题【参考方案3】:
我真的很喜欢@gerard 的答案,这实际上应该是这里的正确答案。我做了一些改进:
代码在一个类中(模块化) 包含解析 如果有异步作业被链接到读取 CSV(如插入 DB 或 HTTP 请求),则可以向外部提供恢复功能 以块/批量大小读取 用户可以声明。我也处理了流中的编码,以防万一 您有不同编码的文件。代码如下:
'use strict'
const fs = require('fs'),
util = require('util'),
stream = require('stream'),
es = require('event-stream'),
parse = require("csv-parse"),
iconv = require('iconv-lite');
class CSVReader
constructor(filename, batchSize, columns)
this.reader = fs.createReadStream(filename).pipe(iconv.decodeStream('utf8'))
this.batchSize = batchSize || 1000
this.lineNumber = 0
this.data = []
this.parseOptions = delimiter: '\t', columns: true, escape: '/', relax: true
read(callback)
this.reader
.pipe(es.split())
.pipe(es.mapSync(line =>
++this.lineNumber
parse(line, this.parseOptions, (err, d) =>
this.data.push(d[0])
)
if (this.lineNumber % this.batchSize === 0)
callback(this.data)
)
.on('error', function()
console.log('Error while reading file.')
)
.on('end', function()
console.log('Read entirefile.')
))
continue ()
this.data = []
this.reader.resume()
module.exports = CSVReader
所以基本上,这就是你将如何使用它:
let reader = CSVReader('path_to_file.csv')
reader.read(() => reader.continue())
我用一个 35GB 的 CSV 文件对此进行了测试,它对我有用,这就是为什么我选择在 @gerard 的答案上构建它,欢迎提供反馈。
【讨论】:
花了多少时间? 显然,这缺少pause()
调用,不是吗?
另外,这不会调用回调函数。因此,如果 batchSize 为 100,文件大小为 150,则只会处理 100 个项目。我错了吗?【参考方案4】:
我使用https://www.npmjs.com/package/line-by-line 从文本文件中读取超过 1 000 000 行。在这种情况下,占用的 RAM 容量约为 50-60 兆字节。
const LineByLineReader = require('line-by-line'),
lr = new LineByLineReader('big_file.txt');
lr.on('error', function (err)
// 'err' contains error object
);
lr.on('line', function (line)
// pause emitting of lines...
lr.pause();
// ...do your asynchronous line processing..
setTimeout(function ()
// ...and continue emitting lines.
lr.resume();
, 100);
);
lr.on('end', function ()
// All lines are read, file is closed now.
);
【讨论】:
'line-by-line' 比所选答案更节省内存。对于 csv 中的 100 万行,所选答案的节点进程只有 800 兆字节。使用“逐行”,它始终处于 700 年代的低位。该模块还保持代码简洁易读。我总共需要阅读大约 1800 万篇文章,所以每 mb 都很重要! 很遗憾,它使用了自己的事件 'line' 而不是标准的 'chunk',这意味着你将无法使用 'pipe'。 经过数小时的测试和搜索,这是唯一真正停止在lr.cancel()
方法上的解决方案。在 1ms 内读取 5Gig 文件的前 1000 行。太棒了!!!!【参考方案5】:
Node.js 文档提供了一个使用 Readline 模块的非常优雅的示例。
Example: Read File Stream Line-by-Line
const fs = require('fs');
const readline = require('readline');
const rl = readline.createInterface(
input: fs.createReadStream('sample.txt'),
crlfDelay: Infinity
);
rl.on('line', (line) =>
console.log(`Line from file: $line`);
);
注意:我们使用 crlfDelay 选项将 CR LF ('\r\n') 的所有实例识别为单个换行符。
【讨论】:
就我而言,我想使用元素的innerhtml
在 HTML 中显示整个文本,但最后一行总是被截断,即使我的 css 中有overflow: auto
。怎么了?
好的,我明白了。我必须使用比padding
参数更大的padding-bottom
。【参考方案6】:
除了逐行读取大文件外,还可以逐块读取。更多请参考this article
var offset = 0;
var chunkSize = 2048;
var chunkBuffer = new Buffer(chunkSize);
var fp = fs.openSync('filepath', 'r');
var bytesRead = 0;
while(bytesRead = fs.readSync(fp, chunkBuffer, 0, chunkSize, offset))
offset += bytesRead;
var str = chunkBuffer.slice(0, bytesRead).toString();
var arr = str.split('\n');
if(bytesRead = chunkSize)
// the last item of the arr may be not a full line, leave it to the next chunk
offset -= arr.pop().length;
lines.push(arr);
console.log(lines);
【讨论】:
会不会,以下应该是比较而不是赋值:if(bytesRead = chunkSize)
?【参考方案7】:
我也遇到了同样的问题。对比了几个貌似有这个功能的模块,我决定自己做,比我想象的要简单。
要点:https://gist.github.com/deemstone/8279565
var fetchBlock = lineByline(filepath, onEnd);
fetchBlock(function(lines, start) ... ); //linesarray startint lines[0] No.
它涵盖了在闭包中打开的文件,返回的fetchBlock()
将从文件中获取一个块,结束拆分为数组(将处理上次获取的段)。
我已将每个读取操作的块大小设置为 1024。这个可能有bug,但是代码逻辑很明显,你自己试试吧。
【讨论】:
【参考方案8】:node-byline 使用流,所以我更喜欢使用流来处理大文件。
对于您的日期转换,我将使用moment.js。
为了最大限度地提高吞吐量,您可以考虑使用软件集群。有一些不错的模块很好地包装了节点原生集群模块。我喜欢来自isaacs 的cluster-master。例如您可以创建一个由 x 个工作人员组成的集群,它们都计算一个文件。
对于拆分与正则表达式的基准测试,请使用benchmark.js。直到现在我还没有测试过它。 benchmark.js 可作为节点模块使用
【讨论】:
Notemoment.js
现在由于严重的性能问题而失宠,即:其庞大的足迹,无法摇树,以及根深蒂固但现在广泛不喜欢的可变性。甚至its own devs 也几乎把它写下来了。一些不错的选择是date-fns
和day.js
;这是一篇更详细的文章:skypack.dev/blog/2021/02/the-best-javascript-date-libraries>【参考方案9】:
基于this 问题的答案,我实现了一个类,您可以使用该类与fs.readSync()
逐行同步读取文件。您可以使用Q
承诺来“暂停”和“恢复”(jQuery
似乎需要一个 DOM,因此无法使用nodejs
运行它):
var fs = require('fs');
var Q = require('q');
var lr = new LineReader(filenameToLoad);
lr.open();
var promise;
workOnLine = function ()
var line = lr.readNextLine();
promise = complexLineTransformation(line).then(
function() console.log('ok');workOnLine();,
function() console.log('error');
);
workOnLine();
complexLineTransformation = function (line)
var deferred = Q.defer();
// ... async call goes here, in callback: deferred.resolve('done ok'); or deferred.reject(new Error(error));
return deferred.promise;
function LineReader (filename)
this.moreLinesAvailable = true;
this.fd = undefined;
this.bufferSize = 1024*1024;
this.buffer = new Buffer(this.bufferSize);
this.leftOver = '';
this.read = undefined;
this.idxStart = undefined;
this.idx = undefined;
this.lineNumber = 0;
this._bundleOfLines = [];
this.open = function()
this.fd = fs.openSync(filename, 'r');
;
this.readNextLine = function ()
if (this._bundleOfLines.length === 0)
this._readNextBundleOfLines();
this.lineNumber++;
var lineToReturn = this._bundleOfLines[0];
this._bundleOfLines.splice(0, 1); // remove first element (pos, howmany)
return lineToReturn;
;
this.getLineNumber = function()
return this.lineNumber;
;
this._readNextBundleOfLines = function()
var line = "";
while ((this.read = fs.readSync(this.fd, this.buffer, 0, this.bufferSize, null)) !== 0) // read next bytes until end of file
this.leftOver += this.buffer.toString('utf8', 0, this.read); // append to leftOver
this.idxStart = 0
while ((this.idx = this.leftOver.indexOf("\n", this.idxStart)) !== -1) // as long as there is a newline-char in leftOver
line = this.leftOver.substring(this.idxStart, this.idx);
this._bundleOfLines.push(line);
this.idxStart = this.idx + 1;
this.leftOver = this.leftOver.substring(this.idxStart);
if (line !== "")
break;
;
【讨论】:
【参考方案10】:读取/写入文件使用带有本机 nodejs 模块(fs、readline)的流:
const fs = require('fs');
const readline = require('readline');
const rl = readline.createInterface(
input: fs.createReadStream('input.json'),
output: fs.createWriteStream('output.json')
);
rl.on('line', function(line)
console.log(line);
// Do any 'line' processing if you want and then write to the output file
this.output.write(`$line\n`);
);
rl.on('close', function()
console.log(`Created "$this.output.path"`);
);
【讨论】:
【参考方案11】:import * as csv from 'fast-csv';
import * as fs from 'fs';
interface Row
[s: string]: string;
type RowCallBack = (data: Row, index: number) => object;
export class CSVReader
protected file: string;
protected csvOptions =
delimiter: ',',
headers: true,
ignoreEmpty: true,
trim: true
;
constructor(file: string, csvOptions = )
if (!fs.existsSync(file))
throw new Error(`File $file not found.`);
this.file = file;
this.csvOptions = Object.assign(, this.csvOptions, csvOptions);
public read(callback: RowCallBack): Promise < Array < object >>
return new Promise < Array < object >> (resolve =>
const readStream = fs.createReadStream(this.file);
const results: Array < any > = [];
let index = 0;
const csvStream = csv.parse(this.csvOptions).on('data', async (data: Row) =>
index++;
results.push(await callback(data, index));
).on('error', (err: Error) =>
console.error(err.message);
throw err;
).on('end', () =>
resolve(results);
);
readStream.pipe(csvStream);
);
import CSVReader from '../src/helpers/CSVReader';
(async () =>
const reader = new CSVReader('./database/migrations/csv/users.csv');
const users = await reader.read(async data =>
return
username: data.username,
name: data.name,
email: data.email,
cellPhone: data.cell_phone,
homePhone: data.home_phone,
roleId: data.role_id,
description: data.description,
state: data.state,
;
);
console.log(users);
)();
【讨论】:
【参考方案12】:我制作了一个节点模块来异步读取大文件文本或 JSON。 在大文件上测试。
var fs = require('fs')
, util = require('util')
, stream = require('stream')
, es = require('event-stream');
module.exports = FileReader;
function FileReader()
FileReader.prototype.read = function(pathToFile, callback)
var returnTxt = '';
var s = fs.createReadStream(pathToFile)
.pipe(es.split())
.pipe(es.mapSync(function(line)
// pause the readstream
s.pause();
//console.log('reading line: '+line);
returnTxt += line;
// resume the readstream, possibly from a callback
s.resume();
)
.on('error', function()
console.log('Error while reading file.');
)
.on('end', function()
console.log('Read entire file.');
callback(returnTxt);
)
);
;
FileReader.prototype.readJSON = function(pathToFile, callback)
try
this.read(pathToFile, function(txt)callback(JSON.parse(txt)););
catch(err)
throw new Error('json file is not valid! '+err.stack);
;
只需将文件保存为 file-reader.js,然后像这样使用它:
var FileReader = require('./file-reader');
var fileReader = new FileReader();
fileReader.readJSON(__dirname + '/largeFile.json', function(jsonObj)/*callback logic here*/);
【讨论】:
我看起来像是从 Gerard 的回答中复制的。您应该将您复制的部分归功于 Gerard。以上是关于在 Node.js 中解析巨大的日志文件 - 逐行读取的主要内容,如果未能解决你的问题,请参考以下文章