结合两个可观察序列
Posted
技术标签:
【中文标题】结合两个可观察序列【英文标题】:Combining Two Observable Sequences 【发布时间】:2021-12-21 17:18:51 【问题描述】:我最近重新开始使用 RX 进行编程,但遇到了组合两个序列的问题。
我的序列 o1 看起来像这样:
var o1 = Observable.Interval(TimeSpan.FromSeconds(7))
.Select(i => i + 2)
.Take(2)
.StartWith(1);
我的序列 o2 看起来像这样:
var o2 = Observable.Interval(TimeSpan.FromSeconds(3))
.Delay(TimeSpan.FromSeconds(1))
.Select(i => i + 2)
.Take(4)
.StartWith(1);
大致对应这个大理石图:
o1: 1 - - - - - - 2 - - - - - - 3 -
o2: - a - - b - - c - - d - - e - -
o3: - 1a- - - - - 2c- - - - - - 3e-
我只是在寻找序列 o3 但我似乎无法解决。 Zip
和 CombineLatest
本身都不能胜任这项工作。
【问题讨论】:
o2
通过Subscribe()
将最后一个值保存到普通变量/字段/属性中是否“可以”?还是必须是一个组合的 observable?
我更喜欢组合的 observable。
您在搜索WithLatestFrom
运算符吗?
@TheodorZoulias 这正是我想要的。如果您有兴趣将其添加为答案,我会接受。
【参考方案1】:
您可能正在搜索WithLatestFrom
运算符。
通过将来自第一个源的每个元素与来自第二个源的最新元素(如果有)组合,将两个可观察序列合并为一个可观察序列。从 Rx.NET 4.0 开始,这将在订阅 first 之前订阅 second,以便在第一次立即发出元素的情况下随时提供最新的元素。
在最简单的形式中,它具有以下签名:
public static IObservable<(TFirst First, TSecond Second)> WithLatestFrom<TFirst, TSecond>(
this IObservable<TFirst> first, IObservable<TSecond> second);
生成的序列发出 ValueTuple<TFirst,TSecond>
类型的元素。
【讨论】:
这确实产生了1a
、2b
,最后是3e
——这不完全是OP所追求的。
@Enigmativity 是的,我注意到了。 2
和 c
的值大致同时发出,因此在观察 2b
或 2c
之间存在竞争条件。【参考方案2】:
您可以使用CombineLatest()
、Buffer()
、Where()
和Select()
的组合来构建可观察的o3
:
CombineLatest()
- 只需将两个 observable o1
和 o2
组合到您想要的任何数据结构(我使用了 Tuple
)。
Buffer(2,1)
- 构建一个“滑动窗口”以查看以前和当前的 Tuple
。
Where()
- 过滤“滑动窗口”,你只会得到一个滑动窗口,其中前一个Tuple
的第一个元素(来自o1
)与当前的第一个元素(同样来自o1
)不同Tuple
,所以你知道发生了变化,不管 o2
在此期间做什么。
Select()
- 只需选择当前(或以前)Tuple
。
observable 可能如下所示:
var o1 = Observable.Interval(TimeSpan.FromSeconds(7))
.Select(i => i + 2)
.Take(2)
.StartWith(1)
.Do(it =>
Console.WriteLine("-- o1 triggered: "+it);
);
var o2 = Observable.Interval(TimeSpan.FromSeconds(3))
.Delay(TimeSpan.FromSeconds(1))
.Select(i => i + 2)
.Take(4)
.StartWith(1)
.Do(it =>
Console.WriteLine("-- o2 triggered: "+it);
);
o1.CombineLatest(o2, Tuple.Create)
.StartWith(Tuple.Create(0L, 0L))
.Buffer(2, 1)
.Do(it =>
Console.WriteLine("-- After Buffer: "+String.Join(",",it));
)
.Where(it =>
if (it.Count != 2)
return false;
return it[0].Item1 != it[1].Item1;
)
.Select(it => it[1])
.Subscribe(it =>
Console.WriteLine("Final: "+it);
);
这将生成以下输出:
-- o1 triggered: 1
-- o2 triggered: 1
-- After Buffer: (0, 0),(1, 1)
Final: (1, 1)
-- o2 triggered: 2
-- After Buffer: (1, 1),(1, 2)
-- o1 triggered: 2
-- After Buffer: (1, 2),(2, 2)
Final: (2, 2)
-- o2 triggered: 3
-- After Buffer: (2, 2),(2, 3)
-- o2 triggered: 4
-- After Buffer: (2, 3),(2, 4)
-- o2 triggered: 5
-- After Buffer: (2, 4),(2, 5)
-- o1 triggered: 3
-- After Buffer: (2, 5),(3, 5)
Final: (3, 5)
-- After Buffer: (3, 5)
您可能需要根据实际需要调整/添加/删除StartWith()
调用和/或更改Select()
调用以获取以前或当前的Tuple
。
【讨论】:
【参考方案3】:这对于两个示例源 observables 可以正常工作:
IObservable<long> o1 =
Observable
.Interval(TimeSpan.FromSeconds(7))
.Select(i => i + 2)
.Take(2)
.StartWith(1);
IObservable<char> o2 =
Observable
.Interval(TimeSpan.FromSeconds(3))
.Delay(TimeSpan.FromSeconds(1))
.Select(i => i + 2)
.Take(4)
.StartWith(1)
.Select(x => (char)('a' + x - 1));
IObservable<string> o3 =
from x1 in o1
join x2 in o2
on Observable.Timer(TimeSpan.FromSeconds(2.0))
equals Observable.Timer(TimeSpan.FromSeconds(2.0))
select $"x1x2";
o3.Subscribe(Console.WriteLine);
我得到的输出是:
1a
2c
3e
【讨论】:
感谢您的回答。以上是关于结合两个可观察序列的主要内容,如果未能解决你的问题,请参考以下文章