连接两个(或 n 个)流
Posted
技术标签:
【中文标题】连接两个(或 n 个)流【英文标题】:Concatenate two (or n) streams 【发布时间】:2013-05-02 02:55:39 【问题描述】:2 流:
鉴于可读的streamsstream1
和stream2
,获取包含stream1
和stream2
连接的流的惯用(简洁)方法是什么?
我不能stream1.pipe(outStream); stream2.pipe(outStream)
,因为这样流的内容就混在一起了。
n 个流:
给定一个EventEmitter,它发出不确定数量的流,例如
eventEmitter.emit('stream', stream1)
eventEmitter.emit('stream', stream2)
eventEmitter.emit('stream', stream3)
...
eventEmitter.emit('end')
获取所有流连接在一起的流的惯用(简洁)方式是什么?
【问题讨论】:
【参考方案1】:您也许可以使其更简洁,但这是一个可行的方法:
var util = require('util');
var EventEmitter = require('events').EventEmitter;
function ConcatStream(streamStream)
EventEmitter.call(this);
var isStreaming = false,
streamsEnded = false,
that = this;
var streams = [];
streamStream.on('stream', function(stream)
stream.pause();
streams.push(stream);
ensureState();
);
streamStream.on('end', function()
streamsEnded = true;
ensureState();
);
var ensureState = function()
if(isStreaming) return;
if(streams.length == 0)
if(streamsEnded)
that.emit('end');
return;
isStreaming = true;
streams[0].on('data', onData);
streams[0].on('end', onEnd);
streams[0].resume();
;
var onData = function(data)
that.emit('data', data);
;
var onEnd = function()
isStreaming = false;
streams[0].removeAllListeners('data');
streams[0].removeAllListeners('end');
streams.shift();
ensureState();
;
util.inherits(ConcatStream, EventEmitter);
我们使用streams
(流队列;push
位于后面,shift
位于前面)、isStreaming
和 streamsEnded
来跟踪状态。当我们得到一个新的流时,我们推送它,当一个流结束时,我们停止监听并转移它。当流结束时,我们设置streamsEnded
。
在每个事件中,我们检查我们所处的状态。如果我们已经在流式传输(管道传输),我们什么也不做。如果队列为空并且设置了streamsEnded
,我们将发出end
事件。如果队列中有东西,我们会恢复它并监听它的事件。
*请注意,pause
和 resume
是建议性的,因此某些流可能无法正确运行,并且需要缓冲。这个练习留给读者。
完成所有这些后,我将通过构造一个EventEmitter
来处理n=2
的情况,用它创建一个ConcatStream
,并发出两个stream
事件,然后是一个end
事件。我敢肯定它可以做得更简洁,但我们不妨使用我们所拥有的。
【讨论】:
谢谢亚伦!我有点希望有一些现有的库,所以我可以用三行来解决它。如果没有,我想我可能会将您的解决方案提取到一个包中。我可以在 MIT 许可下使用你的代码吗? 啊,找到stream-stream库了。看我的回答。 @JoLiss 我也先找了一些东西,但我没有找到那个选项。如果您仍然愿意,当然可以在库中使用我的代码。【参考方案2】:combined-stream 包连接流。自述文件中的示例:
var CombinedStream = require('combined-stream');
var fs = require('fs');
var combinedStream = CombinedStream.create();
combinedStream.append(fs.createReadStream('file1.txt'));
combinedStream.append(fs.createReadStream('file2.txt'));
combinedStream.pipe(fs.createWriteStream('combined.txt'));
我相信您必须一次附加所有流。如果队列为空,combinedStream
将自动结束。见issue #5。
stream-stream 库是具有显式 .end
的替代方案,但它不太受欢迎,而且可能没有经过充分测试。它使用 Node 0.10 的 streams2 API(参见this discussion)。
【讨论】:
combined-stream
包已经支持在回调函数中添加源流,因此您不必在开始时启动它们,这有助于节省内存、文件描述符等。此外,还有很多更受欢迎的库 multistream 似乎经过更多测试【参考方案3】:
streamee.js 是一组基于 node 1.0+ 流的流转换器和作曲家,包括一个连接方法:
var stream1ThenStream2 = streamee.concatenate([stream1, stream2]);
【讨论】:
谢谢,我去看看。我假设那是 Node 0.10? 是 Node 0.10,但您可以将旧式流包装成 README 中所写的 0.10+ 流【参考方案4】:https://github.com/joepie91/node-combined-stream2 是组合流模块(如上文所述)的插入式 Streams2 兼容替代品。它自动包装 Streams1 流。
combined-stream2 的示例代码:
var CombinedStream = require('combined-stream2');
var fs = require('fs');
var combinedStream = CombinedStream.create();
combinedStream.append(fs.createReadStream('file1.txt'));
combinedStream.append(fs.createReadStream('file2.txt'));
combinedStream.pipe(fs.createWriteStream('combined.txt'));
【讨论】:
【参考方案5】:这可以用 vanilla nodejs 完成
import PassThrough from 'stream'
const merge = (...streams) =>
let pass = new PassThrough()
let waiting = streams.length
for (let stream of streams)
pass = stream.pipe(pass, end: false)
stream.once('end', () => --waiting === 0 && pass.emit('end'))
return pass
【讨论】:
如果一个流永远不会结束,而另一个流会怎样 只是更新pass.emit('end')
不起作用。试试pass.end()
将 --waiting
更改为 waiting--
@TomLarkworthy 这是对@PirateApp 的回应吗?如果不是,那么我不明白为什么要这样做,因为这会导致最后一个流永远不会结束
这个解决方案很好,但由于某些奇怪的原因,我的使用中没有保持流的顺序。调用merge(a, b)
产生了一个流,其中b
在a
之前。是否与b
与a
相比,它的项目流少得多,并且先结束这一事实有关?【参考方案6】:
如果您不关心流中数据的顺序,在nodejs 中进行简单的reduce 操作应该没问题!
const PassThrough = require('stream')
let joined = [s0, s1, s2, ...sN].reduce((pt, s, i, a) =>
s.pipe(pt, end: false)
s.once('end', () => a.every(s => s.ended) && pt.emit('end'))
return pt
, new PassThrough())
干杯 ;)
【讨论】:
你不应该从reduce返回一些东西吗?这看起来像joined
将是未定义的。
警告:这将导致所有流并行传输到 PassThrough 流,而不考虑数据的顺序,很可能会损坏您的数据。
@LeonLi 这确实是这种方法的目的。如果您想保留顺序,您可以将不同于 PassThrough 的初始值传递给您的 reduce 函数;)
@Ivo 这个问题询问的是 concatenation。因此,大多数到达此 QA 的读者都会关心订购。这个答案默默地误导了那些读者,因为流成功地通过了,但除非你检查输出,否则你永远不会知道它也会混淆你的所有数据(这个问题首先特别要求避免!)。我敦促您将此信息添加到答案正文中。
没有stream.ended
这样的东西。您必须在结束事件处理程序中设置 s.ended = true
。【参考方案7】:
在 vanilla nodejs 中使用 ECMA 15+ 并结合 Ivo 和 Fengstrong> 的好答案。
PassThrough
类是一个普通的Transform
流,它不会以任何方式修改流。
const PassThrough = require('stream');
const concatStreams = (streamArray, streamCounter = streamArray.length) => streamArray
.reduce((mergedStream, stream) =>
// pipe each stream of the array into the merged stream
// prevent the automated 'end' event from firing
mergedStream = stream.pipe(mergedStream, end: false );
// rewrite the 'end' event handler
// Every time one of the stream ends, the counter is decremented.
// Once the counter reaches 0, the mergedstream can emit its 'end' event.
stream.once('end', () => --streamCounter === 0 && mergedStream.emit('end'));
return mergedStream;
, new PassThrough());
可以这样使用:
const mergedStreams = concatStreams([stream1, stream2, stream3]);
【讨论】:
这会在流完成之前对流进行管道化,将它们混杂在一起;这正是最初的问题要避免的问题 - 如何连接而不是混乱流。 为了避免这种情况,你应该在前一个触发'end'事件之后stream.pipe下一个【参考方案8】:下面的代码对我有用:)。已从之前给出的所有答案中获取输入
const pipeStreams = (streams) =>
const out = new PassThrough()
// Piping the first stream to the out stream
// Also prevent the automated 'end' event of out stream from firing
streams[0].pipe(out, end: false )
for (let i = 0; i < streams.length - 2; i++)
// On the end of each stream (until the second last) pipe the next stream to the out stream
// Prevent the automated 'end' event of out stream from firing
streams[i].on('end', () =>
streams[i + 1].pipe(out, end: false )
)
// On the end of second last stream pipe the last stream to the out stream.
// Don't prevent the 'end flag from firing'
streams[streams.length - 2].on('end', () =>
streams[streams.length - 1].pipe(out)
)
return out
【讨论】:
【参考方案9】:这里两个最受好评的答案都不适用于异步流,因为它们只是通过管道传输内容,而不管源流是否已准备好生成。我必须将内存中的字符串流与来自数据库的数据馈送相结合,并且数据库内容始终位于结果流的末尾,因为它需要一秒钟才能获得数据库响应。这就是我最终为我的目的而写的内容。
export function joinedStream(...streams: Readable[]): Readable
function pipeNext(): void
const nextStream = streams.shift();
if (nextStream)
nextStream.pipe(out, end: false );
nextStream.on('end', function()
pipeNext();
);
else
out.end();
const out = new PassThrough();
pipeNext();
return out;
【讨论】:
【参考方案10】:现在可以使用异步迭代器轻松完成此操作
async function* concatStreams(readables)
for (const readable of readables)
for await (const chunk of readable) yield chunk
你可以这样使用它
const fs = require('fs')
const stream = require('stream')
const files = ['file1.txt', 'file2.txt', 'file3.txt']
const iterable = await concatStreams(files.map(f => fs.createReadStream(f)))
// convert the async iterable to a readable stream
const mergedStream = stream.Readable.from(iterable)
有关异步迭代器的更多信息:https://2ality.com/2019/11/nodejs-streams-async-iteration.html
【讨论】:
createReadStream
返回的流不可迭代。
你的意思是mergedStream吗?因为我可以毫无问题地迭代它gist.github.com/ducaale/5e3fd00a70487c98333e5fb42bc4b624
如果不需要订单,ss可以等待所有的,以便并行运行,从而更快?
当然,您可以通过await readable.next()
手动从两个异步迭代器中获取下一个项目,然后获取第一个解析的项目。
这个选项在我看来是最好的,但打字稿实现必须在签名上添加...
,像这样ts async function* concatStreams(...readables)
以上是关于连接两个(或 n 个)流的主要内容,如果未能解决你的问题,请参考以下文章
Kotlin 协程Flow 流展平 ( 连接模式 flatMapConcat | 合并模式 flatMapMerge | 最新展平模式 flatMapLatest )
Kotlin 协程Flow 流展平 ( 连接模式 flatMapConcat | 合并模式 flatMapMerge | 最新展平模式 flatMapLatest )