一种以均匀间隔推送缓冲事件的方法
Posted
技术标签:
【中文标题】一种以均匀间隔推送缓冲事件的方法【英文标题】:A way to push buffered events in even intervals 【发布时间】:2011-05-06 14:36:56 【问题描述】:我想要实现的是缓冲来自一些 IObservable 的传入事件(它们以突发形式出现)并进一步释放它们,但以均匀的间隔一个接一个地释放它们。 像这样:
-oo-ooo-oo------------------oooo-oo-o-------------->
-o--o--o--o--o--o--o--------o--o--o--o--o--o--o---->
由于我对 Rx 很陌生,我不确定是否已经有一个主题或运算符可以做到这一点。也许可以通过组合来完成?
更新:
感谢 Richard Szalay 指出 Drain 运算符,我发现另一个 example by James Miles 使用 Drain 运算符。以下是我设法让它在 WPF 应用程序中工作的方法:
.Drain(x =>
Process(x);
return Observable.Return(new Unit())
.Delay(TimeSpan.FromSeconds(1), Scheduler.Dispatcher );
).Subscribe();
我玩得很开心,因为省略 scheduler 参数会导致应用程序在调试模式下崩溃,而不会出现任何异常(我需要学习如何处理 Rx 中的异常)。 Process 方法直接修改 UI 状态,但我想用它制作 IObservable 非常简单(使用 ISubject?)。
更新:
与此同时,我一直在尝试使用 ISubject,下面的类可以满足我的需求 - 它会及时释放缓冲的 T:
public class StepSubject<T> : ISubject<T>
IObserver<T> subscriber;
Queue<T> queue = new Queue<T>();
MutableDisposable cancel = new MutableDisposable();
TimeSpan interval;
IScheduler scheduler;
bool idle = true;
public StepSubject(TimeSpan interval, IScheduler scheduler)
this.interval = interval;
this.scheduler = scheduler;
void Step()
T next;
lock (queue)
idle = queue.Count == 0;
if (!idle)
next = queue.Dequeue();
if (!idle)
cancel.Disposable = scheduler.Schedule(Step, interval);
subscriber.OnNext(next);
public void OnNext(T value)
lock (queue)
queue.Enqueue(value);
if (idle)
cancel.Disposable = scheduler.Schedule(Step);
public IDisposable Subscribe(IObserver<T> observer)
subscriber = observer;
return cancel;
为了清楚起见,从 OnCompleted 和 OnError 中删除了这个幼稚的实现,也只允许单个订阅。
【问题讨论】:
您的主题不是线程安全的。您正在检查您的队列在锁内是否为空,然后在锁外出列。 【参考方案1】:它实际上比听起来更狡猾。
使用Delay
不起作用,因为值仍会批量发生,只是稍微延迟。
将Interval
与CombineLatest
或Zip
一起使用都不起作用,因为前者会导致源值被跳过,而后者会缓冲间隔值。
我认为新的 Drain
运算符 (added in 1.0.2787.0) 结合 Delay
应该可以解决问题:
source.Drain(x => Observable.Empty<int>().Delay(TimeSpan.FromSeconds(1)).StartWith(x));
Drain
运算符的工作方式与SelectMany
类似,但要等到前一个输出完成后,才会使用下一个值调用选择器。 它仍然不是完全你所追求的(块中的第一个值也会延迟),但它很接近:上面的用法现在与你的弹珠图匹配。
编辑: 显然框架中的Drain
不像SelectMany
那样工作。我会在官方论坛寻求一些建议。同时,这里有一个 Drain 的实现,可以满足您的需求:
编辑 09/11:修复了实施中的错误并更新了用法以匹配您请求的大理石图。
public static class ObservableDrainExtensions
public static IObservable<TOut> Drain<TSource, TOut>(this IObservable<TSource> source,
Func<TSource, IObservable<TOut>> selector)
return Observable.Defer(() =>
BehaviorSubject<Unit> queue = new BehaviorSubject<Unit>(new Unit());
return source
.Zip(queue, (v, q) => v)
.SelectMany(v => selector(v)
.Do(_ => , () => queue.OnNext(new Unit()))
);
);
【讨论】:
感谢您的回答! Drain 很接近,但主要问题是它不返回 IObservableObservable.Empty...
),那么它应该可以在第一项上正常工作(与您的原始大理石图匹配)
我们现在应该使用规范的Drain
运算符还是您在此处描述的扩展方法?【参考方案2】:
为了完整起见,这里是 Richard 建议的 Drain() 方法的替代(更紧凑)版本:
public static IObservable<T2> SelectManySequential<T1, T2>(
this IObservable<T1> source,
Func<T1, IObservable<T2>> selector
)
return source
.Select(x => Observable.Defer<T2>(() => selector(x)))
.Concat();
请参阅 Rx 论坛中的主题 Drain + SelectMany = ?。
更新: 我意识到我使用的 Concat() 重载是我个人的 Rx 扩展之一,它(还不是)框架的一部分。我很抱歉这个错误.. 当然,这让我的解决方案没有我想象的那么优雅。
不过为了完整起见,我在这里发布了我的 Conact() 扩展方法重载:
public static IObservable<T> Concat<T>(this IObservable<IObservable<T>> source)
return Observable.CreateWithDisposable<T>(o =>
var lockCookie = new Object();
bool completed = false;
bool subscribed = false;
var waiting = new Queue<IObservable<T>>();
var pendingSubscription = new MutableDisposable();
Action<Exception> errorHandler = e =>
o.OnError(e);
pendingSubscription.Dispose();
;
Func<IObservable<T>, IDisposable> subscribe = null;
subscribe = (ob) =>
subscribed = true;
return ob.Subscribe(
o.OnNext,
errorHandler,
() =>
lock (lockCookie)
if (waiting.Count > 0)
pendingSubscription.Disposable = subscribe(waiting.Dequeue());
else if (completed)
o.OnCompleted();
else
subscribed = false;
);
;
return new CompositeDisposable(pendingSubscription,
source.Subscribe(
n =>
lock (lockCookie)
if (!subscribed)
pendingSubscription.Disposable = subscribe(n);
else
waiting.Enqueue(n);
,
errorHandler
, () =>
lock (lockCookie)
completed = true;
if (!subscribed)
o.OnCompleted();
)
);
);
现在用我自己的武器打败自己: 同样的 Concat() 方法可以用 Richard Szalay 的绝妙方式编写得更加优雅:
public static IObservable<T> Concat<T>(this IObservable<IObservable<T>> source)
return Observable.Defer(() =>
BehaviorSubject<Unit> queue = new BehaviorSubject<Unit>(new Unit());
return source
.Zip(queue, (v, q) => v)
.SelectMany(v =>
v.Do(_ => , () => queue.OnNext(new Unit()))
);
);
所以功劳属于理查德。 :-)
【讨论】:
确实,优雅而精彩!掌握这种“短路”方式花了我一些时间,但当我终于明白它时的满足感是无价的:)【参考方案3】:我是这样做的,只使用了一个显式队列(ReactiveCollection 只是 WPF 的 ObservableCollection 的一个奇特版本 - ReactiveCollection.ItemsAdded OnNext 的每个添加的项目,如您所想):
https://github.com/xpaulbettsx/ReactiveXaml/blob/master/ReactiveXaml/ReactiveCollection.cs#L309
public static ReactiveCollection<T> CreateCollection<T>(this IObservable<T> FromObservable, TimeSpan? WithDelay = null)
var ret = new ReactiveCollection<T>();
if (WithDelay == null)
FromObservable.ObserveOn(RxApp.DeferredScheduler).Subscribe(ret.Add);
return ret;
// On a timer, dequeue items from queue if they are available
var queue = new Queue<T>();
var disconnect = Observable.Timer(WithDelay.Value, WithDelay.Value)
.ObserveOn(RxApp.DeferredScheduler).Subscribe(_ =>
if (queue.Count > 0)
ret.Add(queue.Dequeue());
);
// When new items come in from the observable, stuff them in the queue.
// Using the DeferredScheduler guarantees we'll always access the queue
// from the same thread.
FromObservable.ObserveOn(RxApp.DeferredScheduler).Subscribe(queue.Enqueue);
// This is a bit clever - keep a running count of the items actually
// added and compare them to the final count of items provided by the
// Observable. Combine the two values, and when they're equal,
// disconnect the timer
ret.ItemsAdded.Scan0(0, ((acc, _) => acc+1)).Zip(FromObservable.Aggregate(0, (acc,_) => acc+1),
(l,r) => (l == r)).Where(x => x != false).Subscribe(_ => disconnect.Dispose());
return ret;
【讨论】:
以上是关于一种以均匀间隔推送缓冲事件的方法的主要内容,如果未能解决你的问题,请参考以下文章
时间序列分析 - 不均匀间隔测量 - pandas + statsmodels