节点读取指定块大小的文件

Posted

技术标签:

【中文标题】节点读取指定块大小的文件【英文标题】:Node reading file in specified chunk size 【发布时间】:2014-09-26 11:20:21 【问题描述】:

目标:将大文件上传到 AWS Glacier,而不会将整个文件保存在内存中。

我目前正在使用 fs.readFileSync() 上传到冰川,一切正常。但是,我需要处理大于 4GB 的文件,并且我想并行上传多个块。这意味着转向分段上传。 我可以选择块大小,但冰川需要每个块的大小相同(最后一个除外)

This 线程建议我可以在读取流上设置块大小,但我实际上并不能保证得到它。

关于如何在不将整个文件读入内存并手动拆分的情况下获得一致的部分的任何信息?

假设我能做到这一点,我只是要使用集群,其中有几个进程以尽可能快的速度拉出流,因为它们可以上传到 AWS。 如果这似乎是并行工作的错误方式,我会喜欢那里的建议。

【问题讨论】:

【参考方案1】:

如果不出意外,您可以手动使用fs.open()fs.read()fs.close()。示例:

var CHUNK_SIZE = 10 * 1024 * 1024, // 10MB
    buffer = Buffer.alloc(CHUNK_SIZE),
    filePath = '/tmp/foo';

fs.open(filePath, 'r', function(err, fd) 
  if (err) throw err;
  function readNextChunk() 
    fs.read(fd, buffer, 0, CHUNK_SIZE, null, function(err, nread) 
      if (err) throw err;

      if (nread === 0) 
        // done reading file, do any necessary finalization steps

        fs.close(fd, function(err) 
          if (err) throw err;
        );
        return;
      

      var data;
      if (nread < CHUNK_SIZE)
        data = buffer.slice(0, nread);
      else
        data = buffer;

      // do something with `data`, then call `readNextChunk();`
    );
  
  readNextChunk();
);

【讨论】:

虽然这个答案在技术上是正确的,但使用常规文件描述符会放弃所有流事件的优点,这在您尝试在异步代码库中读/写文件时非常有用。 今天运行此代码会导致以下警告:“(node:25440) [DEP0005] DeprecationWarning: Buffer() 因安全性和可用性问题而被弃用。请使用 Buffer.alloc(), Buffer .allocUnsafe() 或 Buffer.from() 方法。" @theycallmemorty 这个答案早在它被弃用之前就已经出现了。我现在已经为现代版本的节点更新了它。 您可能更喜欢更具可读性的 createReadStream 解决方案 @Poyoman 此解决方案是保证块大小的唯一方法,这是 OP 所要求的。如果您不需要特定的块大小,那么是的,流式传输要容易得多。【参考方案2】:

您可以考虑使用下面的 sn-p,我们以 1024 字节的块读取文件

var fs = require('fs');

var data = '';

var readStream = fs.createReadStream('/tmp/foo.txt', highWaterMark: 1 * 1024, encoding: 'utf8' );

readStream.on('data', function(chunk) 
    data += chunk;
    console.log('chunk Data : ')
    console.log(chunk);// your processing chunk logic will go here

).on('end', function() 
    console.log('###################');
    console.log(data); 
// here you see all data processed at end of file
    );

请注意:highWaterMark 是用于块大小的参数 希望这会有所帮助!

网页参考:https://stackabuse.com/read-files-with-node-js/Changing readstream chunksize

【讨论】:

最后是一个很好的 Node 流示例,看起来不太抽象的废话,也不会让我厌烦解释二进制数是什么。【参考方案3】:

基于mscdex's answer,这是一个使用同步替代方案和 StringDecoder 来正确解析 UTF-8 的模块

readableStream 的问题在于,为了使用它,您必须将整个项目转换为使用异步发射器和回调。如果你在编写一些简单的代码,比如 nodejs 中的一个小 CLI,那就没有意义了。

//usage
let file = new UTF8FileReader()
file.open('./myfile.txt', 1024) 
while ( file.isOpen ) 
    let stringData=file.readChunk()
    console.log(stringData)



//--------------------
// UTF8FileReader.ts
//--------------------
import * as fs from 'fs';
import  StringDecoder, NodeStringDecoder  from "string_decoder";

export class UTF8FileReader 

    filename: string;
    isOpen: boolean = false;
    private chunkSize: number;
    private fd: number; //file handle from fs.OpenFileSync
    private readFilePos: number;
    private readBuffer: Buffer;

    private utf8decoder: NodeStringDecoder

    /**
     * open the file | throw
     * @param filename
     */
    open(filename, chunkSize: number = 16 * 1024) 

        this.chunkSize = chunkSize;

        try 
            this.fd = fs.openSync(filename, 'r');
        
        catch (e) 
            throw new Error("opening " + filename + ", error:" + e.toString());
        

        this.filename = filename;
        this.isOpen = true;

        this.readBuffer = Buffer.alloc(this.chunkSize);
        this.readFilePos = 0;

        //a StringDecoder is a buffered object that ensures complete UTF-8 multibyte decoding from a byte buffer
        this.utf8decoder = new StringDecoder('utf8')

    

    /**
     * read another chunk from the file 
     * return the decoded UTF8 into a string
     * (or throw)
     * */
    readChunk(): string 

        let decodedString = '' //return '' by default

        if (!this.isOpen) 
            return decodedString;
        

        let readByteCount: number;
        try 
            readByteCount = fs.readSync(this.fd, this.readBuffer, 0, this.chunkSize, this.readFilePos);
        
        catch (e) 
            throw new Error("reading " + this.filename + ", error:" + e.toString());
        

        if (readByteCount) 
            //some data read, advance readFilePos 
            this.readFilePos += readByteCount;
            //get only the read bytes (if we reached the end of the file)
            const onlyReadBytesBuf = this.readBuffer.slice(0, readByteCount);
            //correctly decode as utf8, and store in decodedString
            //yes, the api is called "write", but it decodes a string - it's a write-decode-and-return the string kind-of-thing :)
            decodedString = this.utf8decoder.write(onlyReadBytesBuf); 
        
        else 
            //read returns 0 => all bytes read
            this.close();
        
        return decodedString 
    

    close() 
        if (!this.isOpen) 
            return;
        
        fs.closeSync(this.fd);
        this.isOpen = false;
        this.utf8decoder.end();
    


如果你还没有 typescript,这里是 .js 转译的代码:

// UTF8FileReader.js
"use strict";
Object.defineProperty(exports, "__esModule",  value: true );
exports.UTF8FileReader = void 0;
//--------------------
// UTF8FileReader
//--------------------
const fs = require("fs");
const string_decoder_1 = require("string_decoder");
class UTF8FileReader 
    constructor() 
        this.isOpen = false;
    
    /**
     * open the file | throw
     * @param filename
     */
    open(filename, chunkSize = 16 * 1024) 
        this.chunkSize = chunkSize;
        try 
            this.fd = fs.openSync(filename, 'r');
        
        catch (e) 
            throw new Error("opening " + filename + ", error:" + e.toString());
        
        this.filename = filename;
        this.isOpen = true;
        this.readBuffer = Buffer.alloc(this.chunkSize);
        this.readFilePos = 0;
        //a StringDecoder is a buffered object that ensures complete UTF-8 multibyte decoding from a byte buffer
        this.utf8decoder = new string_decoder_1.StringDecoder('utf8');
    
    /**
     * read another chunk from the file
     * return the decoded UTF8 into a string
     * (or throw)
     * */
    readChunk() 
        let decodedString = ''; //return '' by default
        if (!this.isOpen) 
            return decodedString;
        
        let readByteCount;
        try 
            readByteCount = fs.readSync(this.fd, this.readBuffer, 0, this.chunkSize, this.readFilePos);
        
        catch (e) 
            throw new Error("reading " + this.filename + ", error:" + e.toString());
        
        if (readByteCount) 
            //some data read, advance readFilePos 
            this.readFilePos += readByteCount;
            //get only the read bytes (if we reached the end of the file)
            const onlyReadBytesBuf = this.readBuffer.slice(0, readByteCount);
            //correctly decode as utf8, and store in decodedString
            //yes, the api is called "write", but it decodes a string - it's a write-decode-and-return the string kind-of-thing :)
            decodedString = this.utf8decoder.write(onlyReadBytesBuf);
        
        else 
            //read returns 0 => all bytes read
            this.close();
        
        return decodedString;
    
    close() 
        if (!this.isOpen) 
            return;
        
        fs.closeSync(this.fd);
        this.isOpen = false;
        this.utf8decoder.end();
    

exports.UTF8FileReader = UTF8FileReader;

【讨论】:

以上是关于节点读取指定块大小的文件的主要内容,如果未能解决你的问题,请参考以下文章

基础命令说明

文件操作-dd

linux dd 读取 写入磁盘速度

在本地文件系统(不是HDFS)中使用Scala读取Apache Spark中的文件时如何更改输入块大小[重复]

python 生成器按指定大小读取文件

dumpe2fs命令