如何传播可观察序列,但前提是它以特定的值集合开始?
Posted
技术标签:
【中文标题】如何传播可观察序列,但前提是它以特定的值集合开始?【英文标题】:How to propagate an observable sequence, but only if it starts with a specific collection of values? 【发布时间】:2021-12-18 16:08:36 【问题描述】:这是一个“只是为了好玩”的问题,它扩展和概括了 jackdry 的 a recent Rx question。这里的问题是如何实现一个接收 IObservable<T>
的 Rx 运算符,并且:
-
如果序列的第一个元素与给定集合 (
ICollection<T>
) 中的元素相等且顺序相同,则转发每个元素不变。
否则发出一个空序列。
例如给定值的集合[a, b, c]:
Source sequence: +--a---b----c-----d--e----|
Expected result: +-----------abc---d--e----|
Source sequence: +----a---p----q---r-----|
Expected result: +--------|
Source sequence: +------a------b------|
Expected result: +--------------------|
Source sequence: +---c---a----b--c---d---|
Expected result: +---|
请求操作员的签名:
public static IObservable<T> IfFirstElements<T>(
this IObservable<T> source,
ICollection<T> expectedFirstElements,
IEqualityComparer<T> comparer = default);
【问题讨论】:
【参考方案1】:给你:
public static IObservable<T> IfFirstElements<T>(
this IObservable<T> source,
ICollection<T> expectedFirstElements,
IEqualityComparer<T> comparer = default) =>
source
.Publish(published =>
from xs in published.Take(expectedFirstElements.Count).ToArray()
from y in
xs.SequenceEqual(expectedFirstElements, comparer)
? xs.ToObservable(Scheduler.Immediate).Concat(published)
: Observable.Empty<T>()
select y);
我试图通过提早失败来提高效率,但每次尝试都降低了效率。
这是我的测试代码:
new[] 1, 2, 3, 4
.ToObservable()
.IfFirstElements(new[] 1, 2, 3 )
.Dump();
Observable
.Interval(TimeSpan.FromSeconds(1.0))
.Take(7)
.Select(x => x + 1)
.IfFirstElements(new long[] 1, 2, 3 )
.Dump();
new[] 2, 2, 3, 4
.ToObservable()
.IfFirstElements(new[] 1, 2, 3 )
.Dump();
LINQPad 需要运行上面的代码来得到这个输出:
要提前结束有点困难,但就是这样:
public static IObservable<T> IfFirstElements<T>(
this IObservable<T> source,
ICollection<T> expectedFirstElements,
IEqualityComparer<T> comparer = default) =>
source
.Publish(published =>
from xs in
published
.Scan(ImmutableList.Create<T>(), (a, b) => a.Add(b))
.TakeUntil(a => a.Zip(expectedFirstElements, (m, n) => comparer == null ? m.Equals(n) : comparer.Equals(m, n)).Any(c => !c))
.Take(expectedFirstElements.Count)
.LastAsync()
from y in
xs.SequenceEqual(expectedFirstElements, comparer)
? xs.ToObservable(Scheduler.Immediate).Concat(published)
: Observable.Empty<T>()
select y);
【讨论】:
不错!不是最有效的(它是一个 O(n²) 算法),但可以完成工作。 @TheodorZoulias -ICollection<T>
上只有 O(n²)
。希望这不会太痛苦。【参考方案2】:
这是另一种方法。这个类似于 Enigmativity 的solution,并且使用几乎相同的运算符。不同之处在于,当source
序列发出一个元素时,仅检查该元素与expectedFirstElements
集合的对应元素是否相等,使其成为O(n) 算法。
/// <summary>
/// If the first elements have the expected values, return the whole sequence.
/// Otherwise, return an empty sequence.
/// </summary>
public static IObservable<T> IfFirstElements<T>(this IObservable<T> source,
ICollection<T> expectedFirstElements, IEqualityComparer<T> comparer = default)
comparer ??= EqualityComparer<T>.Default;
return source.Publish(published => published
.Zip(expectedFirstElements)
.TakeWhile(e => comparer.Equals(e.First, e.Second))
.Take(expectedFirstElements.Count)
.Select(e => e.First)
.ToList()
.SelectMany(list => list.Count == expectedFirstElements.Count ?
published.StartWith(list) : Observable.Empty<T>()));
作为旁注,我最初的意图是将IEnumerable<T>
作为预期值的容器。虽然没有急切地实现可枚举,但我找不到解决方案,所以我作弊并将问题改编为我手头的解决方案。 ? 因此容器的类型是ICollection<T>
。
【讨论】:
顺便说一句,在我写这个答案时,我发现Concat
和Merge(1)
运算符之间存在令人惊讶的行为差异。我在 GitHub 上打开了一个关于它的问题 here。以上是关于如何传播可观察序列,但前提是它以特定的值集合开始?的主要内容,如果未能解决你的问题,请参考以下文章
连接两个都具有 subscribeOn 的可观察序列。如何确保我的 observable 在线程上运行?