如何传播可观察序列,但前提是它以特定的值集合开始?

Posted

技术标签:

【中文标题】如何传播可观察序列,但前提是它以特定的值集合开始?【英文标题】:How to propagate an observable sequence, but only if it starts with a specific collection of values? 【发布时间】:2021-12-18 16:08:36 【问题描述】:

这是一个“只是为了好玩”的问题,它扩展和概括了 jackdry 的 a recent Rx question。这里的问题是如何实现一个接收 IObservable<T> 的 Rx 运算符,并且:

    如果序列的第一个元素与给定集合 (ICollection<T>) 中的元素相等且顺序相同,则转发每个元素不变。 否则发出一个空序列。

例如给定值的集合[a, b, c]:

Source sequence: +--a---b----c-----d--e----|
Expected result: +-----------abc---d--e----|
Source sequence: +----a---p----q---r-----|
Expected result: +--------|
Source sequence: +------a------b------|
Expected result: +--------------------|
Source sequence: +---c---a----b--c---d---|
Expected result: +---|

请求操作员的签名:

public static IObservable<T> IfFirstElements<T>(
    this IObservable<T> source,
    ICollection<T> expectedFirstElements,
    IEqualityComparer<T> comparer = default);

【问题讨论】:

【参考方案1】:

给你:

public static IObservable<T> IfFirstElements<T>(
    this IObservable<T> source,
    ICollection<T> expectedFirstElements,
    IEqualityComparer<T> comparer = default) =>
source
    .Publish(published =>
        from xs in published.Take(expectedFirstElements.Count).ToArray()
        from y in
            xs.SequenceEqual(expectedFirstElements, comparer)
            ? xs.ToObservable(Scheduler.Immediate).Concat(published)
            : Observable.Empty<T>()
        select y);

我试图通过提早失败来提高效率,但每次尝试都降低了效率。

这是我的测试代码:

new[]  1, 2, 3, 4 
    .ToObservable()
    .IfFirstElements(new[]  1, 2, 3 )
    .Dump();

Observable
    .Interval(TimeSpan.FromSeconds(1.0))
    .Take(7)
    .Select(x => x + 1)
    .IfFirstElements(new long[]  1, 2, 3 )
    .Dump();

new[]  2, 2, 3, 4 
    .ToObservable()
    .IfFirstElements(new[]  1, 2, 3 )
    .Dump();

LINQPad 需要运行上面的代码来得到这个输出:


要提前结束有点困难,但就是这样:

public static IObservable<T> IfFirstElements<T>(
    this IObservable<T> source,
    ICollection<T> expectedFirstElements,
    IEqualityComparer<T> comparer = default) =>
        source
            .Publish(published =>
                from xs in
                    published
                        .Scan(ImmutableList.Create<T>(), (a, b) => a.Add(b))
                        .TakeUntil(a => a.Zip(expectedFirstElements, (m, n) => comparer == null ? m.Equals(n) : comparer.Equals(m, n)).Any(c => !c))
                        .Take(expectedFirstElements.Count)
                        .LastAsync()
                from y in
                    xs.SequenceEqual(expectedFirstElements, comparer)
                    ? xs.ToObservable(Scheduler.Immediate).Concat(published)
                    : Observable.Empty<T>()
                select y);

【讨论】:

不错!不是最有效的(它是一个 O(n²) 算法),但可以完成工作。 @TheodorZoulias - ICollection&lt;T&gt; 上只有 O(n²)。希望这不会太痛苦。【参考方案2】:

这是另一种方法。这个类似于 Enigmativity 的solution,并且使用几乎相同的运算符。不同之处在于,当source 序列发出一个元素时,仅检查该元素与expectedFirstElements 集合的对应元素是否相等,使其成为O(n) 算法。

/// <summary>
/// If the first elements have the expected values, return the whole sequence.
/// Otherwise, return an empty sequence.
/// </summary>
public static IObservable<T> IfFirstElements<T>(this IObservable<T> source,
    ICollection<T> expectedFirstElements, IEqualityComparer<T> comparer = default)

    comparer ??= EqualityComparer<T>.Default;
    return source.Publish(published => published
        .Zip(expectedFirstElements)
        .TakeWhile(e => comparer.Equals(e.First, e.Second))
        .Take(expectedFirstElements.Count)
        .Select(e => e.First)
        .ToList()
        .SelectMany(list => list.Count == expectedFirstElements.Count ?
            published.StartWith(list) : Observable.Empty<T>()));

作为旁注,我最初的意图是将IEnumerable&lt;T&gt; 作为预期值的容器。虽然没有急切地实现可枚举,但我找不到解决方案,所以我作弊并将问题改编为我手头的解决方案。 ? 因此容器的类型是ICollection&lt;T&gt;

【讨论】:

顺便说一句,在我写这个答案时,我发现ConcatMerge(1) 运算符之间存在令人惊讶的行为差异。我在 GitHub 上打开了一个关于它的问题 here。

以上是关于如何传播可观察序列,但前提是它以特定的值集合开始?的主要内容,如果未能解决你的问题,请参考以下文章

将更改发布到可观察对象的集合

结合两个可观察序列

连接两个都具有 subscribeOn 的可观察序列。如何确保我的 observable 在线程上运行?

如何在一个可观察的查找和移动项目到集合的顶部

如何在可观察集合的 UI 中立即反映任何添加、删除、字段更改

传播与社会影响阅读笔记