如何推迟流读取调用

Posted

技术标签:

【中文标题】如何推迟流读取调用【英文标题】:How to defer stream read invocation 【发布时间】:2019-02-18 13:44:24 【问题描述】:

总的来说,我仍在努力探索streams。我已经能够在form.on('part') 中使用multiparty 流式传输一个大文件。但我需要推迟调用并在读取流之前解析它。我试过PassThroughthroughthrough2,但得到了不同的结果,主要是挂起,我不知道该怎么做,也不知道调试的步骤。我对所有选择持开放态度。感谢您提供所有见解。

import multiparty from 'multiparty'
import 
  PassThrough
 from 'stream';
import through from 'through'
import through2 from 'through2'

export function promisedMultiparty(req) 
  return new Promise((resolve, reject) => 

    const form = new multiparty.Form()
    const form_files = []
    let q_str = ''

    form.on('field', (fieldname, value) => 
      if (value) q_str = appendQStr(fieldname, value, q_str)
    )

    form.on('part', async (part) => 
      if (part.filename) 

        const pass1 = new PassThrough() // this hangs at 10% 

        const pass2 = through(function write(data)  // this hangs from the beginning
            this.queue(data)
          ,
          function end() 
            this.queue(null)
          )

        const pass3 = through2() // this hangs at 10%

        /* 
            // This way works for large files, but I want to defer 
            // invocation

            const form_data = new FormData()
            form_data.append(savepath, part, 
              filename,
            )

            const r = request.post(url, 
              headers: 
                'transfer-encoding': 'chunked'
              
            , responseCallback(resolve))
            r._form = form

        */

        form_files.push(
          part: part.pipe(pass1),
          // part: part.pipe(pass2),
          // part: part.pipe(pass3),
        )

       else 
        part.resume()
      
    )

    form.on('close', () => 
      resolve(
        fields: qs.parse(q_str),
        forms: form_files,
      )
    )

    form.parse(req)
  )

附言如果有人可以使用适当的术语,请确保标题可能会更好。谢谢。

【问题讨论】:

我应该指出PassThroughthrough2 适用于较小的文件。 PassThroughthrough2 的行为方式是否相同(挂在 10%),因为它们基于 Stream2 你能再解释一下你想做什么吗? 感谢@F.bernal 的关注。我不想从form.on('part') 中发送请求,而是想用未读流解析,并在promise 解析的函数中,使用额外的上下文开始流式传输。 任何有关如何调试或公开流帖子 multiparty 的提示都会很棒。 如果我错了,请纠正我。您希望 multipart 通过为您提供文件的块和调用函数 promiseMultiparty 的点开始执行文件流式传输来完成其工作? 【参考方案1】:

我认为这是因为您没有正确使用 through2 - 即缓冲区已满后实际上并未清空(这就是为什么它在较大文件上挂起 10%,但在较小文件上工作)。

我相信这样的实现应该可以做到:

const pass2 = through2(function(chunk, encoding, next) 

   // do something with the data


   // Use this only if you want to send the data further to another stream reader 
   // Note - From your implementation you don't seem to need it
   // this.push(data)

   // This is what tells through2 it's ready to empty the 
   //  buffer and read more data
   next();
)

【讨论】:

那是缺少的逻辑。谢谢!

以上是关于如何推迟流读取调用的主要内容,如果未能解决你的问题,请参考以下文章

调用 IOUtils.copy 后如何重新读取 InputStream?

IMFSourceReader.ReadSample 在读取流 1 后从不触发回调。对流 0 的调用工作正常

如果调用了推迟输入转换,则永远不会观察到流的“asLiveData()”

使用java优化oracle

使用流异步读取文件时如何同步处理每一行/缓冲区[重复]

httpServletRequest中的流只能读取一次的原因