NodeJs through处理流

Posted _成飞

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了NodeJs through处理流相关的知识,希望对你有一定的参考价值。

through2主要是基于streams2(2指的是API稳定性)封装的transform stream。其内部仅是封装了Transform的构造函数,以及更为易用的objectMode模式。

through2并未引用node默认提供的stream模块,而是使用社区中较为流行的readable-stream模块,主要是为了对之前node版本做了兼容支持。

我们可以先看一段关于transform stream使用的基本示例:

const  Transform  = require(\'stream\');

const upperCaseTr = new Transform(
  transform(chunk, encoding, callback) 
    // 转换为大写、push到可读流
    this.push(chunk.toString().toUpperCase());
    callback();
  
);

process.stdin.pipe(upperCaseTr).pipe(process.stdout);

对应的如果换成through2写法。

process.stdin.pipe(through2(function(chunk, enc, callback) 
  this.push(chunk.toString().toUpperCase())
  callback()
)).pipe(process.stdout);

接下来我们再看看对象模式,默认地,stream除了接收 Buffer/String 值。还有一个对象模式(objectMode)的标识,我们可以设置以接受任意Javascript 对象。

const  Transform  = require(\'stream\');
const commaSplitter = new Transform(
  readableObjectMode: true,
  transform(chunk, encoding, callback) 
    this.push(chunk.toString().trim().split(\',\'));
    callback();
  
);
const arrayToObject = new Transform(
  readableObjectMode: true,
  writableObjectMode: true,
  transform(chunk, encoding, callback) 
    const obj = ;
    for(let i=0; i < chunk.length; i+=2) 
      obj[chunk[i]] = chunk[i+1];
    
    this.push(obj);
    callback();
  
);
const objectToString = new Transform(
  writableObjectMode: true,
  transform(chunk, encoding, callback) 
    this.push(JSON.stringify(chunk) + \'\\n\');
    callback();
  
);
process.stdin
  .pipe(commaSplitter)
  .pipe(arrayToObject)
  .pipe(objectToString)
  .pipe(process.stdout)

我们在终端输入:

$ a,b,c,d
$ "a":"b","c":"d"

对应的如果换成through2写法。

process.stdin
  .pipe(through2.obj(function (chunk, enc, callback) 
    this.push(chunk.toString().trim().split(\',\'));
    callback();
  ))
  .pipe(through2.obj(function (chunk, enc, callback) 
    const obj = ;
    for(let i=0; i < chunk.length; i+=2) 
      obj[chunk[i]] = chunk[i+1];
    
    this.push(obj);
    callback();
  ))
  .pipe(through2.obj(function (chunk, enc, callback) 
    this.push(JSON.stringify(chunk) + \'\\n\');
    callback();
  ))
  .pipe(process.stdout)

 pipe中间进行处理

var through2  = require(\'through2\');
var stream = through2(write,end)
process.stdin
    .pipe(stream)
    .pipe(process.stdout);

function write(line,_,next)
    this.push(line.toString().toUpperCase())
    next();
)
function end(done)
    done();
)

 

如何推迟流读取调用

【中文标题】如何推迟流读取调用【英文标题】:How to defer stream read invocation 【发布时间】:2019-02-18 13:44:24 【问题描述】:

总的来说,我仍在努力探索streams。我已经能够在form.on('part') 中使用multiparty 流式传输一个大文件。但我需要推迟调用并在读取流之前解析它。我试过PassThroughthroughthrough2,但得到了不同的结果,主要是挂起,我不知道该怎么做,也不知道调试的步骤。我对所有选择持开放态度。感谢您提供所有见解。

import multiparty from 'multiparty'
import 
  PassThrough
 from 'stream';
import through from 'through'
import through2 from 'through2'

export function promisedMultiparty(req) 
  return new Promise((resolve, reject) => 

    const form = new multiparty.Form()
    const form_files = []
    let q_str = ''

    form.on('field', (fieldname, value) => 
      if (value) q_str = appendQStr(fieldname, value, q_str)
    )

    form.on('part', async (part) => 
      if (part.filename) 

        const pass1 = new PassThrough() // this hangs at 10% 

        const pass2 = through(function write(data)  // this hangs from the beginning
            this.queue(data)
          ,
          function end() 
            this.queue(null)
          )

        const pass3 = through2() // this hangs at 10%

        /* 
            // This way works for large files, but I want to defer 
            // invocation

            const form_data = new FormData()
            form_data.append(savepath, part, 
              filename,
            )

            const r = request.post(url, 
              headers: 
                'transfer-encoding': 'chunked'
              
            , responseCallback(resolve))
            r._form = form

        */

        form_files.push(
          part: part.pipe(pass1),
          // part: part.pipe(pass2),
          // part: part.pipe(pass3),
        )

       else 
        part.resume()
      
    )

    form.on('close', () => 
      resolve(
        fields: qs.parse(q_str),
        forms: form_files,
      )
    )

    form.parse(req)
  )

附言如果有人可以使用适当的术语,请确保标题可能会更好。谢谢。

【问题讨论】:

我应该指出PassThroughthrough2 适用于较小的文件。 PassThroughthrough2 的行为方式是否相同(挂在 10%),因为它们基于 Stream2 你能再解释一下你想做什么吗? 感谢@F.bernal 的关注。我不想从form.on('part') 中发送请求,而是想用未读流解析,并在promise 解析的函数中,使用额外的上下文开始流式传输。 任何有关如何调试或公开流帖子 multiparty 的提示都会很棒。 如果我错了,请纠正我。您希望 multipart 通过为您提供文件的块和调用函数 promiseMultiparty 的点开始执行文件流式传输来完成其工作? 【参考方案1】:

我认为这是因为您没有正确使用 through2 - 即缓冲区已满后实际上并未清空(这就是为什么它在较大文件上挂起 10%,但在较小文件上工作)。

我相信这样的实现应该可以做到:

const pass2 = through2(function(chunk, encoding, next) 

   // do something with the data


   // Use this only if you want to send the data further to another stream reader 
   // Note - From your implementation you don't seem to need it
   // this.push(data)

   // This is what tells through2 it's ready to empty the 
   //  buffer and read more data
   next();
)

【讨论】:

那是缺少的逻辑。谢谢!

以上是关于NodeJs through处理流的主要内容,如果未能解决你的问题,请参考以下文章

从 nodejs 向 FFmpeg 发送 2 个流

使用 `:has_many :through` 记录关联处理复选框表单

为什么当一个流有错误时flatMap没有输出?

Hive Read & Write Flink

在NodeJS服务器上处理大量数据检索

通过模型关联对nodejs进行续集