无法将来自同一广播流的多个流添加到 StreamController
Posted
技术标签:
【中文标题】无法将来自同一广播流的多个流添加到 StreamController【英文标题】:Cannot add multiple streams derived from the same broadcast stream to a StreamController 【发布时间】:2019-08-21 10:20:45 【问题描述】:我正在实现一个标记器。它解析文档,在一组可能的分隔符上对其进行标记,然后为我提供 1、2 和 3 词标记的组合。 我能够实现我的目标,但只能通过一种特定方式:
Stream<String> contentStr = file.openRead().transform(utf8.decoder);
Stream<String> tokens = contentStr.transform(charSplitter).transform(tokenizer).asBroadcastStream();
var twoWordTokens = tokens.transform(sliding(2));
var threeWordTokens = tokens.transform(sliding(3));
StreamController<String> merger = StreamController();
tokens.forEach((token) => merger.add(token));
threeWordTokens.forEach((token) => merger.add(token));
twoWordTokens.forEach((token) => merger.add(token));
merger.stream.forEach(print);
如您所见,我执行以下操作:
广播原始令牌流 通过滑动窗口转换将其转换为 2 个额外的流 创建一个StreamConsumer
(准确地说是StreamController
)并将每个事件从 3 个流发送到该流使用者。
然后我打印流消费者的每个元素以进行测试
它有效,但我不喜欢通过StreamConsumer.add
方法从源流中添加每个元素。我想改用StreamController.addStream
,但不知何故不起作用。
下面的代码给了我一个Bad state: Cannot add event while adding a stream
错误,我明白为什么:
StreamController<String> merger = StreamController();
merger.addStream(tokens);
merger.addStream(twoWordTokens);
merger.addStream(threeWordTokens);
merger.stream.forEach(print);
这是StreamController.addStream
中的API documentation。
所以我需要等待每个addStream
返回未来完成:
StreamController<String> merger = StreamController();
await merger.addStream(tokens);
await merger.addStream(twoWordTokens);
await merger.addStream(threeWordTokens);
await merger.stream.forEach(print);
但在这种情况下,我没有在控制台中打印任何内容。
如果我这样做:
StreamController<String> merger = StreamController();
merger.stream.forEach(print);
await merger.addStream(tokens);
await merger.addStream(twoWordTokens);
await merger.addStream(threeWordTokens);
然后只打印 1 个单词的标记,即原始广播流的元素。派生流的元素不是。
我有点理解为什么会发生这种情况,因为我所有的流都是从原始广播流派生的。
有没有更好的方法来实现这样的管道?
可能我的问题可以在流复制/分叉方面重新表述,但我看不到在 Dart 中克隆流的方法。如果您可以就此提出建议 - 请这样做。
【问题讨论】:
在你的最后一个例子中,你能尝试移动“merger.stream.forEach(print);”吗?在第一次 addStream 调用之前,并且也使 forEach 调用不等待。我的理论是 addStream 调用正在等待事件被处理,但由于流上没有侦听器,所以调用只是永远等待。 我在发布问题之前尝试过。然后打印所有 1 字标记,执行完成。所以没有处理其他 2 个流。我也有点理解为什么,因为首先await
然后消耗整个广播流,但我不确定我是否完全理解。同样关于the call are just waiting forever
- 在这种情况下,执行不应该在控制台中使用Process finished with exit code 0
完成,但确实如此。一般来说,查看StreamController.addStream
文档,它看起来很简单,但实际上我不能使用它......
它将永远等待,因为您实际上从未close 令牌流。如addStream
文档中所述:Returns a future which completes when the source stream is done.
@Mattia 我在开头又添加了一行:Stream<String> contentStr = file.openRead().transform(utf8.decoder);
。因此,您最初可以看到文件中的所有流。我认为当文件被完全读取时,文件流将被完成,然后所有派生流也将自动完成。不是吗?
是的,我的错,顺便说一句,你可以像这样使用stream_transform 中的mergeAll:tokens.transform(mergeAll[stream1, stream2, ...]).listen(print)
从来没有尝试过这样艰难的事情
【参考方案1】:
我希望在某个时候允许并发 addStream
,但在那之前,您需要单独处理事件:
var allAdds = [
tokens.forEach(merger.add),
twoWordTokens.forEach(merger.add),
threeWordTokens.forEach(merger.add)];
Future.wait(allAdds).then((_) merger.close(); );
merger.stream.forEach(print);
如果你想自己控制一切。您还可以使用来自package:async
的StreamGroup
类。它收集多个流并将它们的事件作为单个流发出。
这假设您没有错误事件。
【讨论】:
看起来不错,但是当我尝试在merger.close();
行上设置断点时,它永远不会到达那里。或者,如果我在那里添加一些打印语句,它永远不会打印任何内容。所以我怀疑流控制器实际上并没有关闭。任何想法为什么?
抱歉,我在我的代码中发现了一个错误。在我的一个变压器中,我忘记关闭内部流控制器。所以一切都很好。感谢StreamGroup
参考。以上是关于无法将来自同一广播流的多个流添加到 StreamController的主要内容,如果未能解决你的问题,请参考以下文章
使用 Nginx RTMP 模块联播多个广播公司流的最佳方法是啥?考虑使用 Docker,但这似乎有点矫枉过正