NodeJs through处理流
Posted _成飞
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了NodeJs through处理流相关的知识,希望对你有一定的参考价值。
through2主要是基于streams2
(2指的是API稳定性)封装的transform stream
。其内部仅是封装了Transform的构造函数,以及更为易用的objectMode
模式。
through2
并未引用node默认提供的stream模块,而是使用社区中较为流行的readable-stream
模块,主要是为了对之前node版本做了兼容支持。
我们可以先看一段关于transform stream使用的基本示例:
const Transform = require(\'stream\'); const upperCaseTr = new Transform( transform(chunk, encoding, callback) // 转换为大写、push到可读流 this.push(chunk.toString().toUpperCase()); callback(); ); process.stdin.pipe(upperCaseTr).pipe(process.stdout);
对应的如果换成through2
写法。
process.stdin.pipe(through2(function(chunk, enc, callback) this.push(chunk.toString().toUpperCase()) callback() )).pipe(process.stdout);
接下来我们再看看对象模式,默认地,stream除了接收 Buffer/String 值。还有一个对象模式(objectMode
)的标识,我们可以设置以接受任意Javascript 对象。
const Transform = require(\'stream\'); const commaSplitter = new Transform( readableObjectMode: true, transform(chunk, encoding, callback) this.push(chunk.toString().trim().split(\',\')); callback(); ); const arrayToObject = new Transform( readableObjectMode: true, writableObjectMode: true, transform(chunk, encoding, callback) const obj = ; for(let i=0; i < chunk.length; i+=2) obj[chunk[i]] = chunk[i+1]; this.push(obj); callback(); ); const objectToString = new Transform( writableObjectMode: true, transform(chunk, encoding, callback) this.push(JSON.stringify(chunk) + \'\\n\'); callback(); ); process.stdin .pipe(commaSplitter) .pipe(arrayToObject) .pipe(objectToString) .pipe(process.stdout)
我们在终端输入:
$ a,b,c,d $ "a":"b","c":"d"
对应的如果换成through2
写法。
process.stdin .pipe(through2.obj(function (chunk, enc, callback) this.push(chunk.toString().trim().split(\',\')); callback(); )) .pipe(through2.obj(function (chunk, enc, callback) const obj = ; for(let i=0; i < chunk.length; i+=2) obj[chunk[i]] = chunk[i+1]; this.push(obj); callback(); )) .pipe(through2.obj(function (chunk, enc, callback) this.push(JSON.stringify(chunk) + \'\\n\'); callback(); )) .pipe(process.stdout)
pipe中间进行处理
var through2 = require(\'through2\'); var stream = through2(write,end) process.stdin .pipe(stream) .pipe(process.stdout); function write(line,_,next) this.push(line.toString().toUpperCase()) next(); ) function end(done) done(); )
如何推迟流读取调用
【中文标题】如何推迟流读取调用【英文标题】:How to defer stream read invocation 【发布时间】:2019-02-18 13:44:24 【问题描述】:总的来说,我仍在努力探索streams
。我已经能够在form.on('part')
中使用multiparty 流式传输一个大文件。但我需要推迟调用并在读取流之前解析它。我试过PassThrough
、through
。 through2
,但得到了不同的结果,主要是挂起,我不知道该怎么做,也不知道调试的步骤。我对所有选择持开放态度。感谢您提供所有见解。
import multiparty from 'multiparty'
import
PassThrough
from 'stream';
import through from 'through'
import through2 from 'through2'
export function promisedMultiparty(req)
return new Promise((resolve, reject) =>
const form = new multiparty.Form()
const form_files = []
let q_str = ''
form.on('field', (fieldname, value) =>
if (value) q_str = appendQStr(fieldname, value, q_str)
)
form.on('part', async (part) =>
if (part.filename)
const pass1 = new PassThrough() // this hangs at 10%
const pass2 = through(function write(data) // this hangs from the beginning
this.queue(data)
,
function end()
this.queue(null)
)
const pass3 = through2() // this hangs at 10%
/*
// This way works for large files, but I want to defer
// invocation
const form_data = new FormData()
form_data.append(savepath, part,
filename,
)
const r = request.post(url,
headers:
'transfer-encoding': 'chunked'
, responseCallback(resolve))
r._form = form
*/
form_files.push(
part: part.pipe(pass1),
// part: part.pipe(pass2),
// part: part.pipe(pass3),
)
else
part.resume()
)
form.on('close', () =>
resolve(
fields: qs.parse(q_str),
forms: form_files,
)
)
form.parse(req)
)
附言如果有人可以使用适当的术语,请确保标题可能会更好。谢谢。
【问题讨论】:
我应该指出PassThrough
和through2
适用于较小的文件。 PassThrough
和 through2
的行为方式是否相同(挂在 10%),因为它们基于 Stream2
?
你能再解释一下你想做什么吗?
感谢@F.bernal 的关注。我不想从form.on('part')
中发送请求,而是想用未读流解析,并在promise 解析的函数中,使用额外的上下文开始流式传输。
任何有关如何调试或公开流帖子 multiparty
的提示都会很棒。
如果我错了,请纠正我。您希望 multipart 通过为您提供文件的块和调用函数 promiseMultiparty 的点开始执行文件流式传输来完成其工作?
【参考方案1】:
我认为这是因为您没有正确使用 through2
- 即缓冲区已满后实际上并未清空(这就是为什么它在较大文件上挂起 10%,但在较小文件上工作)。
我相信这样的实现应该可以做到:
const pass2 = through2(function(chunk, encoding, next)
// do something with the data
// Use this only if you want to send the data further to another stream reader
// Note - From your implementation you don't seem to need it
// this.push(data)
// This is what tells through2 it's ready to empty the
// buffer and read more data
next();
)
【讨论】:
那是缺少的逻辑。谢谢!以上是关于NodeJs through处理流的主要内容,如果未能解决你的问题,请参考以下文章