连接两个(或 n 个)流

Posted

技术标签:

【中文标题】连接两个(或 n 个)流【英文标题】:Concatenate two (or n) streams 【发布时间】:2013-05-02 02:55:39 【问题描述】:

2 流:

鉴于可读的streamsstream1stream2获取包含stream1stream2 连接的流的惯用(简洁)方法是什么?

我不能stream1.pipe(outStream); stream2.pipe(outStream),因为这样流的内容就混在一起了。

n 个流:

给定一个EventEmitter,它发出不确定数量的流,例如

eventEmitter.emit('stream', stream1)
eventEmitter.emit('stream', stream2)
eventEmitter.emit('stream', stream3)
...
eventEmitter.emit('end')

获取所有流连接在一起的流的惯用(简洁)方式是什么?

【问题讨论】:

【参考方案1】:

您也许可以使其更简洁,但这是一个可行的方法:

var util = require('util');
var EventEmitter = require('events').EventEmitter;

function ConcatStream(streamStream) 
  EventEmitter.call(this);
  var isStreaming = false,
    streamsEnded = false,
    that = this;

  var streams = [];
  streamStream.on('stream', function(stream)
    stream.pause();
    streams.push(stream);
    ensureState();
  );

  streamStream.on('end', function() 
    streamsEnded = true;
    ensureState();
  );

  var ensureState = function() 
    if(isStreaming) return;
    if(streams.length == 0) 
      if(streamsEnded)
        that.emit('end');
      return;
    
    isStreaming = true;
    streams[0].on('data', onData);
    streams[0].on('end', onEnd);
    streams[0].resume();
  ;

  var onData = function(data) 
    that.emit('data', data);
  ;

  var onEnd = function() 
    isStreaming = false;
    streams[0].removeAllListeners('data');
    streams[0].removeAllListeners('end');
    streams.shift();
    ensureState();
  ;


util.inherits(ConcatStream, EventEmitter);

我们使用streams(流队列;push 位于后面,shift 位于前面)、isStreamingstreamsEnded 来跟踪状态。当我们得到一个新的流时,我们推送它,当一个流结束时,我们停止监听并转移它。当流结束时,我们设置streamsEnded

在每个事件中,我们检查我们所处的状态。如果我们已经在流式传输(管道传输),我们什么也不做。如果队列为空并且设置了streamsEnded,我们将发出end 事件。如果队列中有东西,我们会恢复它并监听它的事件。

*请注意,pauseresume 是建议性的,因此某些流可能无法正确运行,并且需要缓冲。这个练习留给读者。

完成所有这些后,我将通过构造一个EventEmitter 来处理n=2 的情况,用它创建一个ConcatStream,并发出两个stream 事件,然后是一个end 事件。我敢肯定它可以做得更简洁,但我们不妨使用我们所拥有的。

【讨论】:

谢谢亚伦!我有点希望有一些现有的库,所以我可以用三行来解决它。如果没有,我想我可能会将您的解决方案提取到一个包中。我可以在 MIT 许可下使用你的代码吗? 啊,找到stream-stream库了。看我的回答。 @JoLiss 我也先找了一些东西,但我没有找到那个选项。如果您仍然愿意,当然可以在库中使用我的代码。【参考方案2】:

combined-stream 包连接流。自述文件中的示例:

var CombinedStream = require('combined-stream');
var fs = require('fs');

var combinedStream = CombinedStream.create();
combinedStream.append(fs.createReadStream('file1.txt'));
combinedStream.append(fs.createReadStream('file2.txt'));

combinedStream.pipe(fs.createWriteStream('combined.txt'));

我相信您必须一次附加所有流。如果队列为空,combinedStream 将自动结束。见issue #5。

stream-stream 库是具有显式 .end 的替代方案,但它不太受欢迎,而且可能没有经过充分测试。它使用 Node 0.10 的 streams2 API(参见this discussion)。

【讨论】:

combined-stream 包已经支持在回调函数中添加源流,因此您不必在开始时启动它们,这有助于节省内存、文件描述符等。此外,还有很多更受欢迎的库 multistream 似乎经过更多测试【参考方案3】:

streamee.js 是一组基于 node 1.0+ 流的流转换器和作曲家,包括一个连接方法:

var stream1ThenStream2 = streamee.concatenate([stream1, stream2]);

【讨论】:

谢谢,我去看看。我假设那是 Node 0.10? 是 Node 0.10,但您可以将旧式流包装成 README 中所写的 0.10+ 流【参考方案4】:

https://github.com/joepie91/node-combined-stream2 是组合流模块(如上文所述)的插入式 Streams2 兼容替代品。它自动包装 Streams1 流。

combined-stream2 的示例代码:

var CombinedStream = require('combined-stream2');
var fs = require('fs');

var combinedStream = CombinedStream.create();
combinedStream.append(fs.createReadStream('file1.txt'));
combinedStream.append(fs.createReadStream('file2.txt'));

combinedStream.pipe(fs.createWriteStream('combined.txt'));

【讨论】:

【参考方案5】:

这可以用 vanilla nodejs 完成

import  PassThrough  from 'stream'
const merge = (...streams) => 
    let pass = new PassThrough()
    let waiting = streams.length
    for (let stream of streams) 
        pass = stream.pipe(pass, end: false)
        stream.once('end', () => --waiting === 0 && pass.emit('end'))
    
    return pass

【讨论】:

如果一个流永远不会结束,而另一个流会怎样 只是更新pass.emit('end') 不起作用。试试pass.end() --waiting 更改为 waiting-- @TomLarkworthy 这是对@PirateApp 的回应吗?如果不是,那么我不明白为什么要这样做,因为这会导致最后一个流永远不会结束 这个解决方案很好,但由于某些奇怪的原因,我的使用中没有保持流的顺序。调用merge(a, b) 产生了一个流,其中ba 之前。是否与ba 相比,它的项目流少得多,并且先结束这一事实有关?【参考方案6】:

如果您不关心流中数据的顺序,在nodejs 中进行简单的reduce 操作应该没问题!

const PassThrough = require('stream')

let joined = [s0, s1, s2, ...sN].reduce((pt, s, i, a) => 
  s.pipe(pt, end: false)
  s.once('end', () => a.every(s => s.ended) && pt.emit('end'))
  return pt
, new PassThrough())

干杯 ;)

【讨论】:

你不应该从reduce返回一些东西吗?这看起来像 joined 将是未定义的。 警告:这将导致所有流并行传输到 PassThrough 流,而不考虑数据的顺序,很可能会损坏您的数据。 @LeonLi 这确实是这种方法的目的。如果您想保留顺序,您可以将不同于 PassThrough 的初始值传递给您的 reduce 函数;) @Ivo 这个问题询问的是 concatenation。因此,大多数到达此 QA 的读者都会关心订购。这个答案默默地误导了那些读者,因为流成功地通过了,但除非你检查输出,否则你永远不会知道它也会混淆你的所有数据(这个问题首先特别要求避免!)。我敦促您将此信息添加到答案正文中。 没有stream.ended 这样的东西。您必须在结束事件处理程序中设置 s.ended = true【参考方案7】:

在 vanilla nodejs 中使用 ECMA 15+ 并结合 IvoFeng​​strong> 的好答案。

PassThrough 类是一个普通的Transform 流,它不会以任何方式修改流。

const  PassThrough  = require('stream');

const concatStreams = (streamArray, streamCounter = streamArray.length) => streamArray
  .reduce((mergedStream, stream) => 
    // pipe each stream of the array into the merged stream
    // prevent the automated 'end' event from firing
    mergedStream = stream.pipe(mergedStream,  end: false );
    // rewrite the 'end' event handler
    // Every time one of the stream ends, the counter is decremented.
    // Once the counter reaches 0, the mergedstream can emit its 'end' event.
    stream.once('end', () => --streamCounter === 0 && mergedStream.emit('end'));
    return mergedStream;
  , new PassThrough());

可以这样使用:

const mergedStreams = concatStreams([stream1, stream2, stream3]);

【讨论】:

这会在流完成之前对流进行管道化,将它们混杂在一起;这正是最初的问题要避免的问题 - 如何连接而不是混乱流。 为了避免这种情况,你应该在前一个触发'end'事件之后stream.pipe下一个【参考方案8】:

下面的代码对我有用:)。已从之前给出的所有答案中获取输入

  const pipeStreams = (streams) => 
  const out = new PassThrough()
  // Piping the first stream to the out stream
  // Also prevent the automated 'end' event of out stream from firing
  streams[0].pipe(out,  end: false )
  for (let i = 0; i < streams.length - 2; i++) 
    // On the end of each stream (until the second last) pipe the next stream to the out stream
    // Prevent the automated 'end' event of out stream from firing
    streams[i].on('end', () => 
      streams[i + 1].pipe(out,  end: false )
    )
  
  // On the end of second last stream pipe the last stream to the out stream.
  // Don't prevent the 'end flag from firing'
  streams[streams.length - 2].on('end', () => 
    streams[streams.length - 1].pipe(out)
  )
  return out
 

【讨论】:

【参考方案9】:

这里两个最受好评的答案都不适用于异步流,因为它们只是通过管道传输内容,而不管源流是否已准备好生成。我必须将内存中的字符串流与来自数据库的数据馈送相结合,并且数据库内容始终位于结果流的末尾,因为它需要一秒钟才能获得数据库响应。这就是我最终为我的目的而写的内容。

export function joinedStream(...streams: Readable[]): Readable 
  function pipeNext(): void 
    const nextStream = streams.shift();
    if (nextStream) 
      nextStream.pipe(out,  end: false );
      nextStream.on('end', function() 
        pipeNext();
      );
     else 
      out.end();
    
  
  const out = new PassThrough();
  pipeNext();
  return out;

【讨论】:

【参考方案10】:

现在可以使用异步迭代器轻松完成此操作

async function* concatStreams(readables) 
  for (const readable of readables) 
    for await (const chunk of readable)  yield chunk 
  
 

你可以这样使用它

const fs = require('fs')
const stream = require('stream')

const files = ['file1.txt', 'file2.txt', 'file3.txt'] 
const iterable = await concatStreams(files.map(f => fs.createReadStream(f)))

// convert the async iterable to a readable stream
const mergedStream = stream.Readable.from(iterable)

有关异步迭代器的更多信息:https://2ality.com/2019/11/nodejs-streams-async-iteration.html

【讨论】:

createReadStream 返回的流不可迭代。 你的意思是mergedStream吗?因为我可以毫无问题地迭代它gist.github.com/ducaale/5e3fd00a70487c98333e5fb42bc4b624 如果不需要订单,ss可以等待所有的,以便并行运行,从而更快? 当然,您可以通过await readable.next() 手动从两个异步迭代器中获取下一个项目,然后获取第一个解析的项目。 这个选项在我看来是最好的,但打字稿实现必须在签名上添加...,像这样ts async function* concatStreams(...readables)

以上是关于连接两个(或 n 个)流的主要内容,如果未能解决你的问题,请参考以下文章

bzoj4930棋盘 费用流

BZOJ4930棋盘 拆边费用流

Kotlin 协程Flow 流展平 ( 连接模式 flatMapConcat | 合并模式 flatMapMerge | 最新展平模式 flatMapLatest )

Kotlin 协程Flow 流展平 ( 连接模式 flatMapConcat | 合并模式 flatMapMerge | 最新展平模式 flatMapLatest )

[CQOI2012]交换棋子(最小费用最大流)

1305. [CQOI2009]跳舞最大流+二分