无法将来自同一广播流的多个流添加到 StreamController

Posted

技术标签:

【中文标题】无法将来自同一广播流的多个流添加到 StreamController【英文标题】:Cannot add multiple streams derived from the same broadcast stream to a StreamController 【发布时间】:2019-08-21 10:20:45 【问题描述】:

我正在实现一个标记器。它解析文档,在一组可能的分隔符上对其进行标记,然后为我提供 1、2 和 3 词标记的组合。 我能够实现我的目标,但只能通过一种特定方式:

  Stream<String> contentStr = file.openRead().transform(utf8.decoder);
  Stream<String> tokens = contentStr.transform(charSplitter).transform(tokenizer).asBroadcastStream();
  var twoWordTokens = tokens.transform(sliding(2));
  var threeWordTokens = tokens.transform(sliding(3));
  StreamController<String> merger = StreamController();
  tokens.forEach((token) => merger.add(token));
  threeWordTokens.forEach((token) => merger.add(token));
  twoWordTokens.forEach((token) => merger.add(token));
  merger.stream.forEach(print);

如您所见,我执行以下操作:

广播原始令牌流 通过滑动窗口转换将其转换为 2 个额外的流 创建一个StreamConsumer(准确地说是StreamController)并将每个事件从 3 个流发送到该流使用者。 然后我打印流消费者的每个元素以进行测试

它有效,但我不喜欢通过StreamConsumer.add 方法从源流中添加每个元素。我想改用StreamController.addStream,但不知何故不起作用。 下面的代码给了我一个Bad state: Cannot add event while adding a stream 错误,我明白为什么:

  StreamController<String> merger = StreamController();
  merger.addStream(tokens);
  merger.addStream(twoWordTokens);
  merger.addStream(threeWordTokens);
  merger.stream.forEach(print);

这是StreamController.addStream 中的API documentation。 所以我需要等待每个addStream返回未来完成:

StreamController<String> merger = StreamController();
await merger.addStream(tokens);
await merger.addStream(twoWordTokens);
await merger.addStream(threeWordTokens);
await merger.stream.forEach(print);

但在这种情况下,我没有在控制台中打印任何内容。

如果我这样做:

StreamController<String> merger = StreamController();
merger.stream.forEach(print);
await merger.addStream(tokens);
await merger.addStream(twoWordTokens);
await merger.addStream(threeWordTokens);

然后只打印 1 个单词的标记,即原始广播流的元素。派生流的元素不是。

我有点理解为什么会发生这种情况,因为我所有的流都是从原始广播流派生的。

有没有更好的方法来实现这样的管道?

可能我的问题可以在流复制/分叉方面重新表述,但我看不到在 Dart 中克​​隆流的方法。如果您可以就此提出建议 - 请这样做。

【问题讨论】:

在你的最后一个例子中,你能尝试移动“merger.stream.forEach(print);”吗?在第一次 addStream 调用之前,并且也使 forEach 调用不等待。我的理论是 addStream 调用正在等待事件被处理,但由于流上没有侦听器,所以调用只是永远等待。 我在发布问题之前尝试过。然后打印所有 1 字标记,执行完成。所以没有处理其他 2 个流。我也有点理解为什么,因为首先await 然后消耗整个广播流,但我不确定我是否完全理解。同样关于the call are just waiting forever - 在这种情况下,执行不应该在控制台中使用Process finished with exit code 0 完成,但确实如此。一般来说,查看StreamController.addStream 文档,它看起来很简单,但实际上我不能使用它...... 它将永远等待,因为您实际上从未close 令牌流。如addStream 文档中所述:Returns a future which completes when the source stream is done. @Mattia 我在开头又添加了一行:Stream&lt;String&gt; contentStr = file.openRead().transform(utf8.decoder);。因此,您最初可以看到文件中的所有流。我认为当文件被完全读取时,文件流将被完成,然后所有派生流也将自动完成。不是吗? 是的,我的错,顺便说一句,你可以像这样使用stream_transform 中的mergeAll:tokens.transform(mergeAll[stream1, stream2, ...]).listen(print) 从来没有尝试过这样艰难的事情 【参考方案1】:

我希望在某个时候允许并发 addStream,但在那之前,您需要单独处理事件:

var allAdds = [
    tokens.forEach(merger.add), 
    twoWordTokens.forEach(merger.add), 
    threeWordTokens.forEach(merger.add)]; 
Future.wait(allAdds).then((_)  merger.close(); );

merger.stream.forEach(print);

如果你想自己控制一切。您还可以使用来自package:asyncStreamGroup 类。它收集多个流并将它们的事件作为单个流发出。

这假设您没有错误事件。

【讨论】:

看起来不错,但是当我尝试在merger.close(); 行上设置断点时,它永远不会到达那里。或者,如果我在那里添加一些打印语句,它永远不会打印任何内容。所以我怀疑流控制器实际上并没有关闭。任何想法为什么? 抱歉,我在我的代码中发现了一个错误。在我的一个变压器中,我忘记关闭内部流控制器。所以一切都很好。感谢StreamGroup 参考。

以上是关于无法将来自同一广播流的多个流添加到 StreamController的主要内容,如果未能解决你的问题,请参考以下文章

Flink Broadcast State 实战指南

使用 Nginx RTMP 模块联播多个广播公司流的最佳方法是啥?考虑使用 Docker,但这似乎有点矫枉过正

播放广播流的 Google Action 在 Google Home 设备上以暂停状态开始

处理 flink 广播流中的大数据

使用 hotwire 广播到多个 turbo 流

显示来自 html5 音频流的当前歌曲信息(在 Meteor 应用程序中)