如何将多个 Stream 合并为更高级别的 Stream?

Posted

技术标签:

【中文标题】如何将多个 Stream 合并为更高级别的 Stream?【英文标题】:How can I merge multiple Streams into a higher level Stream? 【发布时间】:2016-04-12 11:26:58 【问题描述】:

我有两个流,Stream<A>Stream<B>。我有一个类型为C 的构造函数,它采用AB。如何将两个Streams 合并为一个Stream<C>

【问题讨论】:

【参考方案1】:
import 'dart:async' show Stream;
import 'package:async/async.dart' show StreamGroup;

main() async 
  var s1 = stream(10);
  var s2 = stream(20);
  var s3 = StreamGroup.merge([s1, s2]);
  await for(int val in s3) 
    print(val);
  


Stream<int> stream(int min) async* 
  int i = min;
  while(i < min + 10) 
    yield i++;
  

另见http://news.dartlang.org/2016/03/unboxing-packages-async-part-2.html

打印

10
20
11
21
12
22
13
23
14
24
15
25
16
26
17
27
18
28
19
29

【讨论】:

我还没有机会查看那个库,但我要谨慎的一件事是单个订阅流及其缓冲事件的能力。如果许多事件被传递到一个流,而另一个流被饿死,则第一个必须在等待第二个事件时进行缓冲。 或者他们暂停订阅。如果StreamController 设置正确,它在暂停时不应产生事件(可能不可能,具体取决于用例)。 大概如果需要进行这种处理,可以得到关于事件到达 A 和 B 流的一些保证;并定义发生错误的条件。 @ArgentiApparatus 从表面上看,@lrn 对 StreamZipper 的使用使我不得不让 Stream&lt;A&gt;Stream&lt;B&gt; 上的更改数量完全相等并且在时间上匹配。流的数据源是 Firebase 数据库中的不同点,因此更新是在随机时间点进行的,而且数量肯定不相等。所以,不,不是。无论如何我都会接受它,因为它回答了所问的问题。 @Brett StreamZip 不要求 A 和 B 上的事件同时到达。当它从一个流接收事件时,它会等待来自另一个流的事件。我不确定当一个流关闭时 StreamZip 会做什么,但我认为它会停止收听其他流。但是,如果您的 A 和 B 事件没有真正“配对”,我认为压缩流方法不适合您。【参考方案2】:

您可以在package:async 中使用StreamZip 将两个流组合成一对流,然后从中创建C 对象。

import "package:async" show StreamZip;
...
Stream<C> createCs(Stream<A> as, Stream<B> bs) =>
  new StreamZip([as, bs]).map((ab) => new C(ab[0], ab[1]));

【讨论】:

但是请注意,StreamZip 只发出对。如果 As 和 Bs 以不同的速率发射,C 将等到它们都发射了一个值,然后再发射一个合并的对。这可能是也可能不是人们想要/期望的。 不过,我没有看到任何替代方案。在 A` 和 B 都可用之前,您无法创建新的 C 对象。除非您每次AB 到达时都创建一个新的C,否则使用另一个的旧值。如果您在第一个 B 之前获得两个 As,那仍然会出现问题。 @Irn combineLatest 通过等待每个流发出至少一个值来解决此问题:pub.dev/documentation/stream_transform/latest/stream_transform/…【参考方案3】:

如果您需要在Stream&lt;A&gt;Stream&lt;B&gt; 发出事件时做出反应并使用两个流中的最新值,请使用combineLatest

Stream<C> merge(Stream<A> streamA, Stream<B> streamB) 
  return streamA
    .combineLatest(streamB, (a, b) => new C(a, b));

【讨论】:

这只会在两个流都发出至少一个事件时发出事件,因此当只有一个流发出事件时,您不会收到通知。有人可能会争辩说,这是自然的解释,我想,例如,我只会得到 Stream A 的结果和 Stream B 的 null ,但事实并非如此。 @scrimau 好点。如果这是一个问题,您可能需要使用 startWith【参考方案4】:

适用于需要组合两个以上不同类型的流并在每次更新任何流时获取所有最新值的人。

import 'package:stream_transform/stream_transform.dart';

Stream<List> combineLatest(Iterable<Stream> streams) 
  final Stream<Object> first = streams.first.cast<Object>();
  final List<Stream<Object>> others = [...streams.skip(1)];
  return first.combineLatestAll(others);

合并的流将产生:

streamA:  a----b------------------c--------d---|
streamB:  --1---------2-----------------|
streamC:  -------&----------%---|
combined: -------b1&--b2&---b2%---c2%------d2%-|

为什么不StreamZip?因为StreamZip 会产生:

streamA:  a----b------------------c--------d---|
streamB:  --1---------2-----------------|
streamC:  -------&----------%---|
combined: -------a1&-------b2%--|

用法:

Stream<T> sA;
Stream<K> sB;
Stream<Y> sC;
combineLatest([sA, sB, sC]).map((data) 
  T resA = data[0];
  K resB = data[1];
  Y resC = data[2];
  return D(resA, resB, resC);
);

【讨论】:

【参考方案5】:

要在第二个流从第一个获取结果时合并两个流,请使用 asyncExpand

  Stream<UserModel?> getCurrentUserModelStream() 
    return FirebaseAuth.instance.authStateChanges().asyncExpand<UserModel?>(
      (currentUser) 
        if (currentUser == null) 
          return Stream.value(null);
        
        return FirebaseFirestore.instance
            .collection('users')
            .doc(currentUser.uid)
            .snapshots()
            .map((doc) 
          final userData = doc.data();
          if (userData == null) 
            return null;
          
          return UserModel.fromJson(userData);
        );
      ,
    );
  

【讨论】:

以上是关于如何将多个 Stream 合并为更高级别的 Stream?的主要内容,如果未能解决你的问题,请参考以下文章

Java 8 Stream 映射调用字符串与合并为一个 [重复]

为啥不在 Stream API 中使用 reduce 来合并多个映射?

在 Delphi 中是不是可以将枚举合并为更大的枚举?

如何将多个查找和替换转换为更有效的循环

Java中如何将两个字符串合并,并且把重复的元素去掉,不能用任何排序指令那些,纯手打写出来。

在线性时间内将小的排序列表合并为更大的排序列表的算法,没有重复项