Rx.Net 运算符忽略某些值,直到它们在一定时间内相同

Posted

技术标签:

【中文标题】Rx.Net 运算符忽略某些值,直到它们在一定时间内相同【英文标题】:Rx.Net Operator that ignores some values until they are identical during a certain amount of time 【发布时间】:2022-01-12 16:08:49 【问题描述】:

我正在尝试创建一个具有以下行为的 Rx.Net 运算符:

当事件属于“正常类型”时,直接返回该事件 当事件属于“特殊类型”时,等待您在一定时间内收到重复的该类型事件

我想要像下面这样的大理石。

当消息类型为A或B时,我们直接发送。当消息是 C 时,我们要确保它不仅仅是一个传递状态,并且只有在一定时间内是这样的情况下才发送它。这是通过第一个C 和我们收到的当前C 之间的时间来衡量的。在该特定时间之后,所有C 都被“接受”,我们将它们作为正常的传递。

这是我们收到 C 时的样子,它只是传递性的,我们想忽略它。

我尝试用Scan 运算符做一些事情,当我有一个特定的值时,我会尝试返回以前/当前的值,但感觉真的很hacky。

这是我编写的一些代码,用于演示我的尝试。在那种情况下,“特殊类型”只是当值为 999 时,但在运算符中,我想做的可能是另一个测试,甚至是传递给我的运算符的函数。

var oneObservable = Observable.Interval(TimeSpan.FromSeconds(1)).Select(_ => 999);
var intObservable = Observable.Interval(TimeSpan.FromSeconds(1)).Select(value => (int)value);

var myObservable = intObservable.Take(4).Concat(oneObservable.Take(3)).Timestamp().Repeat();

var test = myObservable.Scan(
    (previous: default(Timestamped<int>), current: default(Timestamped<int>)),
    (accumulated, current) =>
    
        if (current.Value == 999)
        
            if (accumulated.previous.Value != 999)
            
                return (accumulated.current, current);
            
            
            return (accumulated.previous, current);
        
        else if(accumulated.current.Value == 999)
            return (accumulated.previous, current);
        

        return (accumulated.current, current);
    )
    .Where(
        value => value.current.Value != 999 
        || (value.previous.Value == 999 && value.current.Timestamp - value.previous.Timestamp > TimeSpan.FromSeconds(1.5)))
    .Select(value => value.current);

【问题讨论】:

【参考方案1】:

以下是自定义 IgnoreNonEstablishedContiguousValue 运算符的简单实现,具有理想的功能:

/// <summary>
/// Ignores elements having a specific value, until this value has
/// been repeated contiguously for a specific duration.
/// </summary>
public static IObservable<T> IgnoreNonEstablishedContiguousValue<T>(
    this IObservable<T> source,
    T value,
    TimeSpan dueTimeUntilEstablished,
    IEqualityComparer<T> comparer = default,
    IScheduler scheduler = default)

    // Arguments validation omitted
    comparer ??= EqualityComparer<T>.Default;
    scheduler ??= Scheduler.Default;
    return Observable.Defer(() =>
    
        IStopwatch stopwatch = null;
        return source.Do(item =>
        
            if (comparer.Equals(item, value))
                stopwatch ??= scheduler.StartStopwatch();
            else
                stopwatch = null;
        )
        .Where(_ => stopwatch == null || stopwatch.Elapsed >= dueTimeUntilEstablished);
    );

此实现基于DoWhere 运算符。我不太喜欢使用 Scan 运算符作为构建块,因为它会导致代码冗长且可读性较差,恕我直言。 Observable.Defer 包装器的目的是隔离每个订阅的状态。

使用示例:

var oneObservable = Observable.Interval(TimeSpan.FromSeconds(1)).Select(_ => 999);
var intObservable = Observable.Interval(TimeSpan.FromSeconds(1)).Select(v => (int)v);

IObservable<int> myObservable = intObservable.Take(4).Concat(oneObservable.Take(3))
    .Repeat()
    .IgnoreNonEstablishedContiguousValue(999, TimeSpan.FromSeconds(1.5));

【讨论】:

@Gimly 我应该指出这个实现的一个小缺陷。如果TimeSpan 非常小(小于~30 毫秒),理论上可以发出第一个特殊值。如果当前线程在创建stopwatch 后立即被操作系统挂起,则可能会发生这种情况。当线程稍后恢复时,stopwatch.Elapsed 属性可能已经大于指定的TimeSpan【参考方案2】:

这是我想出的:

public static IObservable<T> FilterSpecials<T>(this IObservable<T> source,
    Func<T, bool> specialDetector,
    TimeSpan timeUntilEstablished,
    IScheduler scheduler = default)

    return source.FilterSpecials(specialDetector, Observable.Timer(timeUntilEstablished), scheduler);


public static IObservable<T> FilterSpecials<T, U>(this IObservable<T> source,
    Func<T, bool> specialDetector,
    IObservable<U> observeSpecialsUntilEstablished,
    IScheduler scheduler = default)

    scheduler = scheduler ?? Scheduler.Default;

    return source
        .Select(i => (value: i, isSpecial: specialDetector(i)))
        .StartWith((value: default(T), isSpecial: false))
        .Publish(_source => _source
            .Zip(_source.Skip(1))
            .Select((t, index) => (
                newValue: t.Second.value,
                isNewValueSpecial: t.Second.isSpecial,
                isPreviousValueSpecial: t.First.isSpecial,
                isFirstElement: index == 0)
            )
            .SelectMany((t, index) => t.isNewValueSpecial
                ? (t.isFirstElement || !t.isPreviousValueSpecial)       
                    ? _source.SkipUntil(observeSpecialsUntilEstablished).TakeWhile(i => i.isSpecial).Select(i => i.value)   
                    : Observable.Empty<T>()
                : Observable.Return(t.newValue)
        ));

问题的核心在于,对于第一个特殊值,您希望暂时停止收听常规的 source observable,并切换到如下所示的内容:

source.SkipUntil(observeSpecialsUntilEstablished).TakeWhile(i => i.isSpecial).Select(i => i.value)

当你有那个特殊的 observable 监听时,你可以忽略你的常规监听并发出 Observable.Empty&lt;T&gt;()。当您没有特殊值时,您实际上是在执行source.SelectMany(i =&gt; Observable.Return(i)),这是返回source 的无操作。

其他一切都是装饰:ZipPublishStartWith 可以轻松与之前的值进行比较。如果您愿意,可以将其抽象出来。将所有内容放在该命名元组中是为了帮助进行自我记录并防止重新调用 specialDetector,以防万一这是一项昂贵的操作。

【讨论】:

有趣的方法。此实现假定observeSpecialsUntilEstablished 参数是冷序列,每次订阅时都会重新启动。更安全的方法是将其替换为 Func&lt;IObservable&lt;U&gt;&gt; 参数,并在每次调用 SkipUntil 时创建一个新序列。 感谢你们两位非常完整和有趣的方法。两者都符合我的需要,我决定接受@TheodorZoulias 的回答,因为我觉得它更简洁易懂。 @TheodorZoulias,不一定是冷序列。它也可以是一个热门的。虽然寒冷是常见的情况。 @TheodorZoulias Func&lt;IObservable&lt;U&gt;IObservable&lt;U&gt; 更有用的场景相差甚远。 API 通常像我一样使用 Observable 参数作为标记,而不提供 Func&lt;IObservable&gt; 重载。 我想到了Window 运算符,它有一个接受Func&lt;TWindowOpening, IObservable&lt;TWindowClosing&gt;&gt; windowClosingSelector 参数的重载。这种具体情况是否需要类似处理,取决于FilterSpecials&lt;T, U&gt;算子的使用场景。传递Observable.Timer 有效,因为这个序列在设计上恰好是冷的。如果它很热,它不会产生理想的行为。

以上是关于Rx.Net 运算符忽略某些值,直到它们在一定时间内相同的主要内容,如果未能解决你的问题,请参考以下文章

是否可以将 Rx“使用”运算符与 IAsyncDisposable 一起使用?

是否可以将 Rx“使用”运算符与 IAsyncDisposable 一起使用?

是否可以告诉 Scintilla 忽略某些击键并将它们传递给父窗口?

Rx.NET 中是不是存在功能类似于 BehaviorSubject 但仅在值发生更改时才发出的 Subject 实现?

有没有办法用 Decodable 忽略某些键,只提取它们的值?

在 bash 脚本中执行命令,直到输出超过某个值