如何防止 fs.createReadStream 打开超过 x 个文件?

Posted

技术标签:

【中文标题】如何防止 fs.createReadStream 打开超过 x 个文件?【英文标题】:How to prevent more than x number of files from being opened by fs.createReadStream? 【发布时间】:2021-10-27 17:09:54 【问题描述】:

在我的代码中,我有一个嵌套的fs.createReadStream(...)readline.on("line"... 事件中被调用。最初我让程序彻底失败,因为 fs 将创建超过 1000 个读取流,并且会触发故障安全停止。

我在打开读取流之前实施了一项检查,以查看打开的文件数量,如果打开的文件数量超过我想要的数量,我会暂停直到它们被关闭。然而,由于代码是异步调用的,在其他任务开始停止之前,似乎有超过 600 个检查通过并打开文件。我无法找到限制读取流数量的方法,任何解决方案都将不胜感激。

整个脚本的目的是逐行比较两个非常大的文件。每个文件有超过 20 万行。

我已经排除了 formatLinecheckLine 函数,并重构了我的代码以尝试保持与问题相关的内容。

var fs = require( 'fs' );
const readline = require('readline');

const files = open: 0;
const wait = ms => new Promise(r => setTimeout(r, ms));
//for debugging purposes, just looking at how many files are open
setInterval(() => console.log(files.open), 500);
async function stallIfNeeded() 
  while (files.open > 50) 
    await wait(500);
  

async function checkLineExists(line) 
  await stallIfNeeded();
  let readStream = fs.createReadStream( "./largeFileOfUncertianValues.csv" );
  files.open = files.open + 1;
  const rl = readline.createInterface(
    input: readStream,
    crlfDelay: Infinity
  );
  const hasLine = await new Promise(async res => 
    let found = false;
    rl.on( 'close', () => 
      files.open = files.open - 1;
      res(found);
    );
    rl.on("line", oline => 
      const hasLine = oline.includes(line);
      if (hasLine) 
        found = true;
        rl.close();
      
    );
  );
  return hasLine;


(async () => 
  const sourceStream = fs.createReadStream('largeFileOfKnownValues.csv');

  const sourceRl = readline.createInterface(
    input: sourceStream,
    crlfDelay: Infinity
  );

  let writeStream = fs.createWriteStream("missing.csv")
  let cntr = 0;
  await new Promise(sRes => 
    sourceRl.on("line", async line => 
      //these are conditions left out for simplicity.
      //I need to format every line and check to make sure it is
      //something I want to search for.
      const lineFormatted = formatLine(line);
      const skipLine = checkLine(lineFormatted);
      if (skipLine) return;

      const exists = await checkLineExists(lineFormatted);

      if (!exists) console.log("doesn't exists");
      if (!exists) writeStream.write( line + "\n" );
      if (++cntr % 50 == 0) 
        console.log("another 50 done");
        console.log(cntr + " so far");
      
    );
    sourceRl.on("close", sRes);
  );
)();

【问题讨论】:

这不太可能是解决此问题的正确设计方法。您不想为源文件中的每一行重新打开比较文件。你能从头开始更详细地描述这个问题吗?您是否只是想查看两个文件的内容和顺序是否相同?或者只是这两个文件包含相同的行,但可以在任一文件中以任何顺序排列? 此外,这种类型的算法将受益于能够使用一些内存进行缓存。你可以允许算法使用多少内存? 是的,jfriend00。我在想也许我需要在这里使用数据库而不是文本文件。这两个文件基本上是文件路径。然而,在这两个位置,路径可能不同但仍然有效。即“Hello there”可以匹配“hEllo_there”,这就是为什么我有 formatLine 函数,它查看另一个文件中的匹配内容,并将其减少到至少应该作为oline.includes(formattedPath) 传递的内容。我在我的家用电脑上做这个,我有 16gb 的内存,但我从来没有研究过内存兑现 不确定的值也比已知的值多,我只想知道缺少什么。 你没有回答我的主要问题。您是否关心文件中的顺序,或者您只是想查看源文件中的路径是否包含在目标文件中的任何位置?这种类型的问题已经为数据库做好了准备。只需将一个文件中的所有路径放入数据库,然后对于另一个文件中的每一行,您只需查询数据库。 【参考方案1】:

如果您真的想最大限度地减少内存使用和/或支持具有良好性能的任意大文件,那么您确实应该将其中一个文件加载到某种可以对数据进行索引查找的数据库中。然后,您可以逐行循环浏览一个文件,规范化该行,然后运行查询以查看它是否存在于其他数据中。

如果没有,这里有几种没有数据库的方法。第一个,将一组数据加载到内存中的 Set 对象中,然后在第二个中逐行检查以查看 Set 中的内容(本质上是内存中的数据库查找)。

const fs = require('fs');
const readline = require('readline');

function normalizeLine(line) 
    return line.toLowerCase();



async function compare(sourceFile, targetFile) 
    // read in all source lines into a normalized Set
    const source = readline.createInterface( input: fs.createReadStream(sourceFile) );
    const sourceLines = new Set();
    for await (const line of source) 
        sourceLines.add(normalizeLine(line));
    

    const notFounds = [];

    const target = readline.createInterface( input: fs.createReadStream(targetFile) );
    for await (const line of target) 
        if (!sourceLines.has(normalizeLine(line))) 
            notFounds.push(line);
        
    
    return notFounds;


compare("source.txt", "target.txt").then(result => 
    if (!result.length) 
        console.log("All target lines found in source");
     else 
        console.log("Not found in source", result);
    
).catch(err => 
    console.log(err);
);

第二个,使用您的方法,即逐行循环浏览第一个文件,然后为第一个文件的每一行逐行循环浏览第二个文件。对于任何大型数据集,这将非常缓慢,但对于大型文件,它将无限扩展。

这使用了 readline 的 promise 接口逐行循环,它 awaits 关闭事件以避免打开文件的任何堆积。

const fs = require('fs');
const readline = require('readline');

function normalizeLine(line) 
    return line.toLowerCase();


async function compare(sourceFile, targetFile) 
    // read in all source lines into a normalized Set
    const source = readline.createInterface( input: fs.createReadStream(targetFile) );
    const notFounds = [];
    for await (const line of source) 
        let found = await findLineInFile(sourceFile, line);
        if (!found) 
            notFounds.push(line);
        
    
    return notFounds;


compare("source.txt", "target.txt").then(result => 
    if (!result.length) 
        console.log("All target lines found in source");
     else 
        console.log("Not found in source", result);
    
).catch(err => 
    console.log(err);
);

function streamDestroy(stream) 
    return new Promise((resolve, reject) => 
        stream.once('close', resolve);
        stream.once('error', reject);
        stream.destroy();
    );


async function findLineInFile(filename, targetLine) 
    const lookLine = normalizeLine(targetLine);
    const stream = fs.createReadStream(filename);

    const source = readline.createInterface( input: stream );
    for await (const line of source) 
        if (normalizeLine(line) === lookLine) 
            await streamDestroy(stream);
            return true;
        
    
    return false;

【讨论】:

这是一个很棒的解决方案,我相信我可以看到如何防止从这里打开更多文件,特别是使用 readline 中的for of 循环,以及确认流已关闭的 streamdestroy 函数.感谢您的时间。您对部分匹配有任何其他意见吗?对于逐行,我可以轻松地将条件从 === 更改为 .includes 但是,也许您对类似于 [].find for Sets 的方法有更好的建议。 @AsyncAwaitFetch - 您必须更详细地描述您尝试对部分匹配执行的操作,并显示应该匹配和不应该匹配的示例。 Set 没有任何部分匹配功能,因此如果您尝试这样做,您将无法使用 Set。

以上是关于如何防止 fs.createReadStream 打开超过 x 个文件?的主要内容,如果未能解决你的问题,请参考以下文章

Node.js 中的 fs.ReadStream 和 fs.createReadStream 有啥区别吗?

为啥 fs.createReadStream ... pipe(res) 锁定读取文件?

fs.createreadstream 中是不是有类似“结束”的“开始”?

node.js 中 fs.createReadStream 与 fs.readFile 的优缺点是啥?

request() 和 fs.createReadStream() 没有返回“正确”的值

NodeJs - 使用 fs.createReadStream 读取文件时出错