一种以均匀间隔推送缓冲事件的方法

Posted

技术标签:

【中文标题】一种以均匀间隔推送缓冲事件的方法【英文标题】:A way to push buffered events in even intervals 【发布时间】:2011-05-06 14:36:56 【问题描述】:

我想要实现的是缓冲来自一些 IObservable 的传入事件(它们以突发形式出现)并进一步释放它们,但以均匀的间隔一个接一个地释放它们。 像这样:

-oo-ooo-oo------------------oooo-oo-o-------------->

-o--o--o--o--o--o--o--------o--o--o--o--o--o--o---->

由于我对 Rx 很陌生,我不确定是否已经有一个主题或运算符可以做到这一点。也许可以通过组合来完成?

更新:

感谢 Richard Szalay 指出 Drain 运算符,我发现另一个 example by James Miles 使用 Drain 运算符。以下是我设法让它在 WPF 应用程序中工作的方法:

    .Drain(x => 
        Process(x);
        return Observable.Return(new Unit())
            .Delay(TimeSpan.FromSeconds(1), Scheduler.Dispatcher );
    ).Subscribe();

我玩得很开心,因为省略 scheduler 参数会导致应用程序在调试模式下崩溃,而不会出现任何异常(我需要学习如何处理 Rx 中的异常)。 Process 方法直接修改 UI 状态,但我想用它制作 IObservable 非常简单(使用 ISubject?)。

更新:

与此同时,我一直在尝试使用 ISubject,下面的类可以满足我的需求 - 它会及时释放缓冲的 T:

public class StepSubject<T> : ISubject<T>

    IObserver<T> subscriber;
    Queue<T> queue = new Queue<T>();
    MutableDisposable cancel = new MutableDisposable();
    TimeSpan interval;
    IScheduler scheduler;
    bool idle = true;

    public StepSubject(TimeSpan interval, IScheduler scheduler)
    
        this.interval = interval;
        this.scheduler = scheduler;
    

    void Step()
    
        T next;
        lock (queue)
        
            idle = queue.Count == 0;
            if (!idle)
                next = queue.Dequeue();
        

        if (!idle)
        
            cancel.Disposable = scheduler.Schedule(Step, interval);
            subscriber.OnNext(next);
        
    

    public void OnNext(T value)
    
        lock (queue)
            queue.Enqueue(value);

        if (idle)
            cancel.Disposable = scheduler.Schedule(Step);
    

    public IDisposable Subscribe(IObserver<T> observer)
    
        subscriber = observer;
        return cancel;
    

为了清楚起见,从 OnCompleted 和 OnError 中删除了这个幼稚的实现,也只允许单个订阅。

【问题讨论】:

您的主题不是线程安全的。您正在检查您的队列在锁内是否为空,然后在锁外出列。 【参考方案1】:

它实际上比听起来更狡猾。

使用Delay 不起作用,因为值仍会批量发生,只是稍微延迟。

IntervalCombineLatestZip 一起使用都不起作用,因为前者会导致源值被跳过,而后者会缓冲间隔值。

我认为新的 Drain 运算符 (added in 1.0.2787.0) 结合 Delay 应该可以解决问题:

source.Drain(x => Observable.Empty<int>().Delay(TimeSpan.FromSeconds(1)).StartWith(x));

Drain 运算符的工作方式与SelectMany 类似,但要等到前一个输出完成后,才会使用下一个值调用选择器。 它仍然不是完全你所追求的(块中的第一个值也会延迟),但它很接近:上面的用法现在与你的弹珠图匹配。

编辑: 显然框架中的Drain 不像SelectMany 那样工作。我会在官方论坛寻求一些建议。同时,这里有一个 Drain 的实现,可以满足您的需求:

编辑 09/11:修复了实施中的错误并更新了用法以匹配您请求的大理石图。

public static class ObservableDrainExtensions

    public static IObservable<TOut> Drain<TSource, TOut>(this IObservable<TSource> source, 
        Func<TSource, IObservable<TOut>> selector)
    
        return Observable.Defer(() =>
        
            BehaviorSubject<Unit> queue = new BehaviorSubject<Unit>(new Unit());

            return source
                .Zip(queue, (v, q) => v)
                .SelectMany(v => selector(v)
                    .Do(_ =>  , () => queue.OnNext(new Unit()))
                );
        );
    

【讨论】:

感谢您的回答! Drain 很接近,但主要问题是它不返回 IObservable,所以我不能在查询中使用它或将它与其他运算符链接。它期望 Func 参数返回一个 IObservable,这会阻止您的示例编译。 不幸的是,我在编译该示例时也遇到了问题:( 感谢您的更新!我尝试将该函数与 .Drain(s => Observable.Return(s).Delay(TimeSpan.FromSeconds(1), Scheduler.Dispatcher)) 一起使用,除了延迟第一个通知外,它还可以工作。 没问题。如果您使用更新后的用法(Observable.Empty...),那么它应该可以在第一项上正常工作(与您的原始大理石图匹配) 我们现在应该使用规范的Drain 运算符还是您在此处描述的扩展方法?【参考方案2】:

为了完整起见,这里是 Richard 建议的 Drain() 方法的替代(更紧凑)版本:

public static IObservable<T2> SelectManySequential<T1, T2>(
    this IObservable<T1> source, 
    Func<T1, IObservable<T2>> selector
)

    return source
        .Select(x => Observable.Defer<T2>(() => selector(x)))
        .Concat();

请参阅 Rx 论坛中的主题 Drain + SelectMany = ?。

更新: 我意识到我使用的 Concat() 重载是我个人的 Rx 扩展之一,它(还不是)框架的一部分。我很抱歉这个错误.. 当然,这让我的解决方案没有我想象的那么优雅。

不过为了完整起见,我在这里发布了我的 Conact() 扩展方法重载:

public static IObservable<T> Concat<T>(this IObservable<IObservable<T>> source)

    return Observable.CreateWithDisposable<T>(o =>
    
        var lockCookie = new Object();
        bool completed = false;
        bool subscribed = false;
        var waiting = new Queue<IObservable<T>>();
        var pendingSubscription = new MutableDisposable();

        Action<Exception> errorHandler = e =>
        
            o.OnError(e);
            pendingSubscription.Dispose();
        ;

        Func<IObservable<T>, IDisposable> subscribe = null;
        subscribe = (ob) =>
        
            subscribed = true;
            return ob.Subscribe(
                o.OnNext,
                errorHandler,
                () =>
                
                    lock (lockCookie)
                    
                        if (waiting.Count > 0)
                            pendingSubscription.Disposable = subscribe(waiting.Dequeue());
                        else if (completed)
                            o.OnCompleted();
                        else
                            subscribed = false;
                    
                
            );
        ;

        return new CompositeDisposable(pendingSubscription,
            source.Subscribe(
                n =>
                
                    lock (lockCookie)
                    
                        if (!subscribed)
                            pendingSubscription.Disposable = subscribe(n);
                        else
                            waiting.Enqueue(n);
                    

                ,
                errorHandler
                , () =>
                
                    lock (lockCookie)
                    
                        completed = true;
                        if (!subscribed)
                            o.OnCompleted();
                    
                
            )
        );
    );

现在用我自己的武器打败自己: 同样的 Concat() 方法可以用 Richard Szalay 的绝妙方式编写得更加优雅:

public static IObservable<T> Concat<T>(this IObservable<IObservable<T>> source)

    return Observable.Defer(() =>
    
        BehaviorSubject<Unit> queue = new BehaviorSubject<Unit>(new Unit());
        return source
            .Zip(queue, (v, q) => v)
            .SelectMany(v => 
                v.Do(_ =>  , () => queue.OnNext(new Unit()))
            );
    );

所以功劳属于理查德。 :-)

【讨论】:

确实,优雅而精彩!掌握这种“短路”方式花了我一些时间,但当我终于明白它时的满足感是无价的:)【参考方案3】:

我是这样做的,只使用了一个显式队列(ReactiveCollection 只是 WPF 的 ObservableCollection 的一个奇特版本 - ReactiveCollection.ItemsAdded OnNext 的每个添加的项目,如您所想):

https://github.com/xpaulbettsx/ReactiveXaml/blob/master/ReactiveXaml/ReactiveCollection.cs#L309

public static ReactiveCollection<T> CreateCollection<T>(this IObservable<T> FromObservable, TimeSpan? WithDelay = null)

    var ret = new ReactiveCollection<T>();
    if (WithDelay == null) 
        FromObservable.ObserveOn(RxApp.DeferredScheduler).Subscribe(ret.Add);
        return ret;
    

    // On a timer, dequeue items from queue if they are available
    var queue = new Queue<T>();
    var disconnect = Observable.Timer(WithDelay.Value, WithDelay.Value)
        .ObserveOn(RxApp.DeferredScheduler).Subscribe(_ => 
            if (queue.Count > 0)  
                ret.Add(queue.Dequeue());
            
        );

    // When new items come in from the observable, stuff them in the queue.
    // Using the DeferredScheduler guarantees we'll always access the queue
    // from the same thread.
    FromObservable.ObserveOn(RxApp.DeferredScheduler).Subscribe(queue.Enqueue);

    // This is a bit clever - keep a running count of the items actually 
    // added and compare them to the final count of items provided by the
    // Observable. Combine the two values, and when they're equal, 
    // disconnect the timer
    ret.ItemsAdded.Scan0(0, ((acc, _) => acc+1)).Zip(FromObservable.Aggregate(0, (acc,_) => acc+1), 
        (l,r) => (l == r)).Where(x => x != false).Subscribe(_ => disconnect.Dispose());

    return ret;

【讨论】:

以上是关于一种以均匀间隔推送缓冲事件的方法的主要内容,如果未能解决你的问题,请参考以下文章

连续的小部件没有均匀地间隔开

时间序列分析 - 不均匀间隔测量 - pandas + statsmodels

时间序列分析 - 不均匀间隔测量 - pandas + statsmodels

如何用python将图片分成均匀间隔的正方形?

均匀间隔水平导航项

Glut:如何为均匀运动生成恒定的时间间隔?