如何推迟流读取调用
Posted
技术标签:
【中文标题】如何推迟流读取调用【英文标题】:How to defer stream read invocation 【发布时间】:2019-02-18 13:44:24 【问题描述】:总的来说,我仍在努力探索streams
。我已经能够在form.on('part')
中使用multiparty 流式传输一个大文件。但我需要推迟调用并在读取流之前解析它。我试过PassThrough
、through
。 through2
,但得到了不同的结果,主要是挂起,我不知道该怎么做,也不知道调试的步骤。我对所有选择持开放态度。感谢您提供所有见解。
import multiparty from 'multiparty'
import
PassThrough
from 'stream';
import through from 'through'
import through2 from 'through2'
export function promisedMultiparty(req)
return new Promise((resolve, reject) =>
const form = new multiparty.Form()
const form_files = []
let q_str = ''
form.on('field', (fieldname, value) =>
if (value) q_str = appendQStr(fieldname, value, q_str)
)
form.on('part', async (part) =>
if (part.filename)
const pass1 = new PassThrough() // this hangs at 10%
const pass2 = through(function write(data) // this hangs from the beginning
this.queue(data)
,
function end()
this.queue(null)
)
const pass3 = through2() // this hangs at 10%
/*
// This way works for large files, but I want to defer
// invocation
const form_data = new FormData()
form_data.append(savepath, part,
filename,
)
const r = request.post(url,
headers:
'transfer-encoding': 'chunked'
, responseCallback(resolve))
r._form = form
*/
form_files.push(
part: part.pipe(pass1),
// part: part.pipe(pass2),
// part: part.pipe(pass3),
)
else
part.resume()
)
form.on('close', () =>
resolve(
fields: qs.parse(q_str),
forms: form_files,
)
)
form.parse(req)
)
附言如果有人可以使用适当的术语,请确保标题可能会更好。谢谢。
【问题讨论】:
我应该指出PassThrough
和through2
适用于较小的文件。 PassThrough
和 through2
的行为方式是否相同(挂在 10%),因为它们基于 Stream2
?
你能再解释一下你想做什么吗?
感谢@F.bernal 的关注。我不想从form.on('part')
中发送请求,而是想用未读流解析,并在promise 解析的函数中,使用额外的上下文开始流式传输。
任何有关如何调试或公开流帖子 multiparty
的提示都会很棒。
如果我错了,请纠正我。您希望 multipart 通过为您提供文件的块和调用函数 promiseMultiparty 的点开始执行文件流式传输来完成其工作?
【参考方案1】:
我认为这是因为您没有正确使用 through2
- 即缓冲区已满后实际上并未清空(这就是为什么它在较大文件上挂起 10%,但在较小文件上工作)。
我相信这样的实现应该可以做到:
const pass2 = through2(function(chunk, encoding, next)
// do something with the data
// Use this only if you want to send the data further to another stream reader
// Note - From your implementation you don't seem to need it
// this.push(data)
// This is what tells through2 it's ready to empty the
// buffer and read more data
next();
)
【讨论】:
那是缺少的逻辑。谢谢!以上是关于如何推迟流读取调用的主要内容,如果未能解决你的问题,请参考以下文章
调用 IOUtils.copy 后如何重新读取 InputStream?
IMFSourceReader.ReadSample 在读取流 1 后从不触发回调。对流 0 的调用工作正常