将数据管道传输到尚未准备好接收数据的可写流

Posted

技术标签:

【中文标题】将数据管道传输到尚未准备好接收数据的可写流【英文标题】:Piping data to writable stream that is not ready to receive data yet 【发布时间】:2016-08-02 13:42:37 【问题描述】:

有没有办法将可读流连接到 Node.js 中的可写流,其中可写流尚未准备好接收数据?换句话说,我想将可读与可写连接起来,但我想在程序的稍后位置初始化可写,包括定义写方法。也许我们必须实现 write 方法,但是有没有办法以类似于暂停可读流的方式暂停可写流?或者,也许我们可以使用中间直通/转换流并在那里缓冲数据,然后再将数据通过管道传输到可写对象!

例如,通常我们会这样做:

readable.pipe(transform).pipe(writable);

但我想做类似的事情:

const tstrm = readable.pipe(transform);

doSomethingAsync().then(function()

      tstrm.pipe(writable);

);

只是想知道这是否可能以及如何正确地做到这一点,到目前为止都无法弄清楚。

我想我希望在中间转换流中缓冲数据,然后再将其连接/传送到可写流,然后,一旦连接,在任何新数据之前先流式传输缓冲的数据。似乎是一个合理的做法,找不到任何关于此的信息。

【问题讨论】:

看起来这里接受的答案接近我正在寻找的***.com/questions/20317759/… 【参考方案1】:

注意,我在这里使用间隔来模拟作者是否能够阅读。您可以以任何您想要的方式执行此操作,即如果作者返回 false,您将更新状态以开始缓冲等。我认为最后一行是您想要的,即

r.pipe(b).pipe(w);

内容如下

readStrem.pipe(transformBbuffer).pipe(writeStream);

示例代码,我们可以进行一些更改来缓冲所有数据。我将在代码之后进行描述。您需要了解的有关流的所有信息以及更多信息都在文档中,我认为它们可以使用更完整的示例来完成,但它们已经相当不错了...

https://nodejs.org/api/stream.html#stream_class_stream_transform_1

这是代码。

var fs     = require('fs');
var stream = require('stream')
const util = require('util');
//const StringDecoder = require('string_decoder').StringDecoder;
const Transform = require('stream').Transform;
var check_buff  = 0;
var DRAIN_ME    = 0;

var r = fs.createReadStream('file1.txt').setEncoding('utf8');
var w = fs.createWriteStream('file2.txt');

var BufferStream = function () 
  stream.Transform.apply(this, arguments);
  this.buffer = []; 
;

util.inherits(BufferStream, stream.Transform);

var intId;
intId = setInterval(function()
  if(check_buff % 3 == 0) 
    DRAIN_ME = 1;
    return;
  
  DRAIN_ME = 0;
,10);  

BufferStream.prototype._transform = function (chunk, encoding, done) 
  this.buffer.push(String(chunk));
  while(DRAIN_ME > 0 && this.buffer.length > 0) 
    this.push(this.buffer.shift());
  
  console.log(chunk.length);
  console.log(this.buffer.length);
  done();
;

var b = new BufferStream();
b.on('end', function(chunk) 
  clearInterval(intId);
);
r.pipe(b).pipe(w);

我正在寻找实现转换/直通的规范方法 流,它会缓冲所有数据,直到调用管道为止。

进行以下更改

BufferStream.prototype._transform = function (chunk, encoding, done) 
  this.buffer.push(String(chunk));

  console.log(chunk.length);
  console.log(this.buffer.length);
  done();
;
......
BufferStream.prototype._flush = function (cb) 
  var len = this.buffer.length;
  for (var i = 0; i < len; i++) 
    this.push(this.buffer.shift());
  ;
  cb();
;

您还可以暂停可读流,这实际上会暂停可写流,因为它停止接收数据,即...

要测试这个,在磁盘上创建一个相当大的文件,即 100MB 或更多,然后运行这个...

var fs = require('fs');
var readableStream = fs.createReadStream('file1.txt');
var writableStream = fs.createWriteStream('file2.txt');

readableStream.setEncoding('utf8');

readableStream.on('data', function(chunk) 
  var ready = 0;
  readableStream.pause();
  setInterval(function()
    if(ready == 0) 
      //console.log('pausing');
      readableStream.pause();
      ready = 1;
       
    else 
      //console.log('resuming');
      readableStream.resume();
      ready = 0;
       
  ,100);  
  writableStream.write(chunk);
);

立即暂停的原因是在间隔触发时10ms 文件可能已经被写入。这有一些变化,即...

var fs = require('fs');
var readableStream = fs.createReadStream('file1.txt');
var writableStream = fs.createWriteStream('file2.txt');
readableStream.setEncoding('utf8');

var ready = 0;
setInterval(function()
  if(ready == 0) 
    //console.log('pausing');
    readableStream.pause();
    ready = 1;
  
  else 
    //console.log('resuming');
    readableStream.resume();
    ready = 0;
  
,100);  

readableStream.on('data', function(chunk) 
  writableStream.write(chunk);
  readableStream.pause();
);

【讨论】:

谢谢,我可以暂停可读的,这并不难,但是在我们等待可写准备好接收数据的同时使用直通/转换流来缓冲数据怎么样?

以上是关于将数据管道传输到尚未准备好接收数据的可写流的主要内容,如果未能解决你的问题,请参考以下文章

NodeJS 可写流 writev 在 1 和 highWaterMark 块之间交替

Node.js 可写流:write vs _write

Node.js 将相同的可读流输送到多个(可写)目标中

了解ipc通信的linux管道生命线

Node.js 可写流创建错误文件(更大且不可读)

记一次数据库奇怪问题 “在从服务器接收结果时发生传输级错误。 (provider: 共享内存提供程序, error: 0 - 管道的另一端上无任何进程。)"...