_read() 未在可读流上实现

Posted

技术标签:

【中文标题】_read() 未在可读流上实现【英文标题】:_read() is not implemented on Readable stream 【发布时间】:2018-08-25 08:10:01 【问题描述】:

这个问题是如何真正实现一个可读流的read方法。

我有一个可读流的实现:

import Readable from "stream";
this.readableStream = new Readable();

我收到了这个错误

events.js:136 投掷者; // 未处理的“错误”事件 ^

错误 [ERR_STREAM_READ_NOT_IMPLEMENTED]:_read() 未实现 在 Readable._read (_stream_readable.js:554:22) 在 Readable.read (_stream_readable.js:445:10) 在 resume_ (_stream_readable.js:825:12) 在 _combinedTickCallback (内部/进程/next_tick.js:138:11) 在 process._tickCallback (internal/process/next_tick.js:180:9) 在 Function.Module.runMain (module.js:684:11) 启动时(bootstrap_node.js:191:16) 在 bootstrap_node.js:613:3

错误发生的原因很明显,我们需要这样做:

  this.readableStream = new Readable(
      read(size) 
        return true;
      
    );

虽然我不太明白如何实现 read 方法。

唯一有效的方法就是调用

this.readableStream.push('some string or buffer');

如果我尝试做这样的事情:

   this.readableStream = new Readable(
          read(size) 
            this.push('foo');   // call push here!
            return true;
          
     );

然后什么都没有发生 - 没有任何东西来自可读!

此外,这些文章说您不需要实现 read 方法:

https://github.com/substack/stream-handbook#creating-a-readable-stream

https://medium.freecodecamp.org/node-js-streams-everything-you-need-to-know-c9141306be93

我的问题是 - 为什么在 read 方法中调用 push 什么都不做?唯一对我有用的就是在别处调用 readable.push()。

【问题讨论】:

【参考方案1】:

readableStream 就像一个水池:

.push(data),就像把水抽到水池里。 .pipe(destination),就像将水池连接到管道上,然后将水抽到其他地方 _read(size) 作为抽水机运行,控制水流的多少以及数据结束的时间。

fs.createReadStream() 将创建读取流,_read() 函数已自动实现以推送文件数据并在文件结束时结束。

当池连接到管道时,_read(size) 会自动触发。因此,如果您在没有连接到目的地的方式的情况下强制调用此函数,它将泵到?哪里?并且会影响_read()里面的机器状态(可能是光标移到了错误的地方,...)

read() 函数必须在 new Stream.Readable() 中创建。它实际上是对象内部的一个函数。不是 readableStream.read(),实现 readableStream.read=function(size)... 也不行。

理解实现的简单方法:

var Reader=new Object();
Reader.read=function(size)
    if (this.i==null)this.i=1;elsethis.i++;
    this.push("abc");
    if (this.i>7) this.push(null); 


const Stream = require('stream');
const renderStream = new Stream.Readable(Reader);

renderStream.pipe(process.stdout)

您可以使用它来重新渲染任何流数据以 POST 到其他服务器。 使用 Axios POST 流数据:

require('axios')(
    method: 'POST',
    url: 'http://127.0.0.1:3000',
    headers: 'Content-Length': 1000000000000,
    data: renderStream
);

【讨论】:

【参考方案2】:

来自文档:

可读._read:

“当调用 readable._read() 时,如果资源中的数据可用,则实现应该开始使用 this.push(dataChunk) 方法将该数据推送到读取队列中。link”

可读.push:

" readable.push() 方法只能由 Readable 实现者调用,并且只能在 readable._read() 方法中调用。link"

【讨论】:

【参考方案3】:

在您的 ReadableStream 初始化后实现 _read 方法:

import Readable from "stream";
this.readableStream = new Readable();
this.readableStream.read = function () ;

【讨论】:

【参考方案4】:

为什么在 read 方法中调用 push 什么都不做?唯一对我有用的就是在别处调用 readable.push()。

我认为这是因为您没有使用它,您需要将其通过管道传输到可写流(例如 stdout),或者只是通过 data 事件使用它:

const  Readable  = require("stream");

let count = 0;
const readableStream = new Readable(
    read(size) 
        this.push('foo');
        if (count === 5) this.push(null);
        count++;
    
);

// piping
readableStream.pipe(process.stdout)

// through the data event
readableStream.on('data', (chunk) => 
  console.log(chunk.toString());
);

它们都应该打印 5 次 foo(尽管它们略有不同)。您应该使用哪一个取决于您要完成的任务。

此外,这些文章说您不需要实现 read 方法:

你可能不需要它,这应该可以:

const  Readable  = require("stream");

const readableStream = new Readable();

for (let i = 0; i <= 5; i++) 
    readableStream.push('foo');

readableStream.push(null);

readableStream.pipe(process.stdout)

在这种情况下,您无法通过data 事件捕获它。另外,我想说这种方式不是很有用而且效率不高,我们只是一次将所有数据推送到流中(如果它很大,所有数据都将在内存中),然后使用它。

【讨论】:

我一直在消费它,readable.on('data', function()); 我扩展了解释,如果你想这样做你确实需要实现read方法! 试试你的第二个例子 - 它应该抛出一个错误,说你需要实现 _read()... 它对我有用,你要关闭它推送null吗?

以上是关于_read() 未在可读流上实现的主要内容,如果未能解决你的问题,请参考以下文章

可读的高水印有用性

KnockoutJS:模板未在可观察数组更改时更新(仅在添加时,在删除时有效)

Spark Streaming:如何在流上加载管道?

jq实现 按钮点击一次后 3秒后在可点击

为什么IO多路复用需要采用非阻塞式IO

内核中读写信号量(rwsem)实现的关键注释