结合两个可观察序列

Posted

技术标签:

【中文标题】结合两个可观察序列【英文标题】:Combining Two Observable Sequences 【发布时间】:2021-12-21 17:18:51 【问题描述】:

我最近重新开始使用 RX 进行编程,但遇到了组合两个序列的问题。

我的序列 o1 看起来像这样:

var o1 = Observable.Interval(TimeSpan.FromSeconds(7))
                   .Select(i => i + 2)
                   .Take(2)
                   .StartWith(1);

我的序列 o2 看起来像这样:

var o2 = Observable.Interval(TimeSpan.FromSeconds(3))
                   .Delay(TimeSpan.FromSeconds(1))
                   .Select(i => i + 2)
                   .Take(4)
                   .StartWith(1);

大致对应这个大理石图:

o1: 1 - - - - - - 2 - - - - - - 3 -
o2: - a - - b - - c - - d - - e - -
o3: - 1a- - - - - 2c- - - - - - 3e-

我只是在寻找序列 o3 但我似乎无法解决。 ZipCombineLatest 本身都不能胜任这项工作。

【问题讨论】:

o2 通过Subscribe() 将最后一个值保存到普通变量/字段/属性中是否“可以”?还是必须是一个组合的 observable? 我更喜欢组合的 observable。 您在搜索WithLatestFrom 运算符吗? @TheodorZoulias 这正是我想要的。如果您有兴趣将其添加为答案,我会接受。 【参考方案1】:

您可能正在搜索WithLatestFrom 运算符。

通过将来自第一个源的每个元素与来自第二个源的最新元素(如果有)组合,将两个可观察序列合并为一个可观察序列。从 Rx.NET 4.0 开始,这将在订阅 first 之前订阅 second,以便在第一次立即发出元素的情况下随时提供最新的元素。

在最简单的形式中,它具有以下签名:

public static IObservable<(TFirst First, TSecond Second)> WithLatestFrom<TFirst, TSecond>(
    this IObservable<TFirst> first, IObservable<TSecond> second);

生成的序列发出 ValueTuple&lt;TFirst,TSecond&gt; 类型的元素。

【讨论】:

这确实产生了1a2b,最后是3e——这不完全是OP所追求的。 @Enigmativity 是的,我注意到了。 2c 的值大致同时发出,因此在观察 2b2c 之间存在竞争条件。【参考方案2】:

您可以使用CombineLatest()Buffer()Where()Select() 的组合来构建可观察的o3

CombineLatest() - 只需将两个 observable o1o2 组合到您想要的任何数据结构(我使用了 Tuple)。 Buffer(2,1) - 构建一个“滑动窗口”以查看以前和当前的 TupleWhere() - 过滤“滑动窗口”,你只会得到一个滑动窗口,其中前一个Tuple 的第一个元素(来自o1)与当前的第一个元素(同样来自o1)不同Tuple,所以你知道发生了变化,不管 o2 在此期间做什么。 Select() - 只需选择当前(或以前)Tuple

observable 可能如下所示:

var o1 = Observable.Interval(TimeSpan.FromSeconds(7))
       .Select(i => i + 2)
       .Take(2)
       .StartWith(1)
       .Do(it => 
           Console.WriteLine("-- o1 triggered: "+it);
       );
var o2 = Observable.Interval(TimeSpan.FromSeconds(3))
       .Delay(TimeSpan.FromSeconds(1))
       .Select(i => i + 2)
       .Take(4)
       .StartWith(1)
       .Do(it => 
           Console.WriteLine("-- o2 triggered: "+it);
       );
o1.CombineLatest(o2, Tuple.Create)
    .StartWith(Tuple.Create(0L, 0L))
    .Buffer(2, 1)
    .Do(it => 
        Console.WriteLine("-- After Buffer: "+String.Join(",",it));
    )
    .Where(it => 
        if (it.Count != 2) 
            return false;
        
        return it[0].Item1 != it[1].Item1;
    )
    .Select(it => it[1])
    .Subscribe(it => 
        Console.WriteLine("Final: "+it);
    );

这将生成以下输出:

-- o1 triggered: 1
-- o2 triggered: 1
-- After Buffer: (0, 0),(1, 1)
Final: (1, 1)
-- o2 triggered: 2
-- After Buffer: (1, 1),(1, 2)
-- o1 triggered: 2
-- After Buffer: (1, 2),(2, 2)
Final: (2, 2)
-- o2 triggered: 3
-- After Buffer: (2, 2),(2, 3)
-- o2 triggered: 4
-- After Buffer: (2, 3),(2, 4)
-- o2 triggered: 5
-- After Buffer: (2, 4),(2, 5)
-- o1 triggered: 3
-- After Buffer: (2, 5),(3, 5)
Final: (3, 5)
-- After Buffer: (3, 5)

您可能需要根据实际需要调整/添加/删除StartWith() 调用和/或更改Select() 调用以获取以前或当前的Tuple

【讨论】:

【参考方案3】:

这对于两个示例源 observables 可以正常工作:

IObservable<long>  o1 =
    Observable
        .Interval(TimeSpan.FromSeconds(7))
        .Select(i => i + 2)
        .Take(2)
        .StartWith(1);

IObservable<char> o2 =
    Observable
        .Interval(TimeSpan.FromSeconds(3))
        .Delay(TimeSpan.FromSeconds(1))
        .Select(i => i + 2)
        .Take(4)
        .StartWith(1)
        .Select(x => (char)('a' + x - 1));

IObservable<string> o3 =
    from x1 in o1
    join x2 in o2
        on Observable.Timer(TimeSpan.FromSeconds(2.0))
        equals Observable.Timer(TimeSpan.FromSeconds(2.0))
    select $"x1x2";

o3.Subscribe(Console.WriteLine);

我得到的输出是:

1a
2c
3e

【讨论】:

感谢您的回答。

以上是关于结合两个可观察序列的主要内容,如果未能解决你的问题,请参考以下文章

RxSwift:结合不同类型的可观察对象和映射结果

如何创建一个表示其他两个可观察对象完成的可观察对象?

如何合并两个可观察的早期完成

在熊猫中结合两个时间序列

RxJS - 连接并合并两个可观察对象

使用 Linq 一次更新可观察集合项的两个字段