如何在node.js中将数组值作为可读流发出/管道?

Posted

技术标签:

【中文标题】如何在node.js中将数组值作为可读流发出/管道?【英文标题】:How to emit/pipe array values as a readable stream in node.js? 【发布时间】:2013-05-26 18:49:25 【问题描述】:

从数组和管道值创建可读流到可写流的最佳方法是什么?我已经看到子堆栈的 example 使用 setInterval 并且我可以使用 0 作为间隔值成功地实现它,但是我正在迭代大量数据并且每次触发 gc 都会减慢速度。

// Working with the setInterval wrapper
var arr = [1, 5, 3, 6, 8, 9];

function createStream () 
    var t = new stream;
    t.readable = true;
    var times = 0;
    var iv = setInterval(function () 
        t.emit('data', arr[times]);
        if (++times === arr.length) 
            t.emit('end');
            clearInterval(iv);
        
    
, 0);

// Create the writable stream s
// ....

createStream().pipe(s);

我想做的是在没有 setInterval 的情况下发出值。也许像这样使用异步模块:

async.forEachSeries(arr, function(item, cb) 
    t.emit('data', item);
    cb();
, function(err) 
 if (err) 
     console.log(err);
 
 t.emit('end');
);

在这种情况下,我迭代数组并发出数据,但从不管道任何值。我已经看过 shinout 的 ArrayStream,但我认为它是在 v0.10 之前创建的,它的开销比我想要的要多。

【问题讨论】:

我认为您获得的开销不会比 ArrayStream (110 sloc) 少得多。 async 在使用 setImmediate 时将类似于 substack 的示例。我认为您不需要为每个数据事件设置 setImmediate/setInterval,因为您没有执行 IO,但您需要处理 ArrayStream 为您执行的暂停/恢复。很想知道你得到了什么答案。 感谢您的意见。我想我对 ArrayStream 最大的担忧是,自从 v0.10 中的 Stream API 发生变化以来,它还没有更新,但这些担忧可能是没有根据的。我很惊讶它的下载量如此之少,这让我相信其他人的做法有所不同。 请考虑不接受已接受的答案并接受一个推荐标准Readable.from 【参考方案1】:

我为此使用了ArrayStream。它确实解决了 GC 被频繁触发的问题。我收到来自节点的递归 process.nextTick 的警告,因此将 ArrayStream 中的 nextTick 回调修改为 setImmediate 并修复了警告并且似乎运行良好。

【讨论】:

【参考方案2】:

这是一个老问题,但如果有人偶然发现,node-stream-array 是 Node.js >= v0.10 的一个更简单、更优雅的实现

var streamify = require('stream-array'),
  os = require('os');

streamify(['1', '2', '3', os.EOL]).pipe(process.stdout);

【讨论】:

不幸的是,node-stream-array 没有返回真正的可读,这可能会导致代码的其他部分出现问题(例如:与 promisepipe 一起使用时缺少破坏函数)【参考方案3】:

您可以通过创建可读流并将值推入其中来解决此问题。

流是一种痛苦,但通常easier 到work with them directly 而不是使用库。

要流式传输的字符串或缓冲区数组

如果您使用的是字符串或缓冲区数组,这将起作用:

'use strict'
const Stream = require('stream')
const readable = new Stream.Readable()

readable.pipe(process.stdout)

const items = ['a', 'b', 'c']
items.forEach(item => readable.push(item))

// no more data
readable.push(null)

注意事项:

readable.pipe(process.stdout) 做了两件事:将流置于“流动”模式并设置 process.stdout 可写流以接收来自 readable 的数据 Readable#push 方法适用于可读流的创建者,而不是流使用者。 您必须通过Readable#push(null) 发出没有更多数据的信号。

要流式传输的非字符串数组

要从既不是字符串也不是缓冲区的数组中创建一个流,您需要可读流和可写流都在"Object Mode" 中。在下面的示例中,我进行了以下更改:

objectMode: true初始化可读流

不是通过管道连接到process.stdout,而是通过管道连接到处于对象模式的简单可写流。

  'use strict'
  const Stream = require('stream')

  const readable = new Stream.Readable(objectMode: true)

  const writable = new Stream.Writable(objectMode: true)
  writable._write = (object, encoding, done) => 
    console.log(object)

    // ready to process the next chunk
    done()
  

  readable.pipe(writable)

  const items = [1, 2, 3]
  items.forEach(item => readable.push(item))

  // end the stream
  readable.push(null)

性能说明

数据从何而来?如果是流数据源,最好使用转换流来操作流,而不是转换为数组。

【讨论】:

我们如何链接您上一个示例的可写内容? @Gura,你想做什么? readable.pipe(writable).pipe(somethingElse)?如果是这样,writable 将必须是一个转换流。如果这是您要查找的内容,我可以添加相关信息。 我用过Transform,效果很好!谢谢@mheiber 很棒的答案,尤其是您链接到的第一个资源。我发现它非常彻底和有用。 可能缺少实现可读流的_read()。请参阅此处以获取类似的答案***.com/a/22085851/2012945(字符串到流)。【参考方案4】:

tl;博士;

这是一个 LIFO 解决方案。 Array.prototype.pop() 具有与 shift 类似的行为,但应用于数组中的最后一个元素。

const items = [1,2,3]
const stream = new Readable(
  objectMode: true,
  read() 
    const item = items.pop()
    if (!item) 
      this.push(null);
      return;
    
    this.push(item)
  ,
)

【讨论】:

我们应该不是shift(),按顺序发送数组,而不是pop() 这是一个 LIFO 解决方案。 Array.prototype.pop() 具有与 shift 类似的行为,但应用于数组中的最后一个元素。 您会编辑答案以澄清这一点吗?因为所有其他答案,以及我自己对这个问题的期望,都围绕 FIFO。【参考方案5】:

从 Node 12.3 开始,您可以改用 stream.Readable.from(iterable, [options])

const  Readable  = require('stream');
const readableStream = Readable.from(arr);

【讨论】:

它应该适用于Iterable——你有一个例子说明它不适用于对象数组吗? 哦,你是对的。我在我正在做的事情上尝试过它,但它没有用。我认为这是因为 Readable 不在objectMode 中。但我只是创建了一个简单的示例并且它有效。我会删除我的评论。

以上是关于如何在node.js中将数组值作为可读流发出/管道?的主要内容,如果未能解决你的问题,请参考以下文章

无法使用一个可读流写入 Node JS 中的两个不同目标

从 JavaScript 对象创建 Node.js 可读流——最简单的方法

Node.js 将相同的可读流输送到多个(可写)目标中

深入node.js 3 模板引擎原理 事件 文件操作 可读流的实现原理

在NodeJS中将可读流关闭到FIFO

管道传输到可写流时暂停可读流