反应式扩展是不是支持滚动缓冲区?
Posted
技术标签:
【中文标题】反应式扩展是不是支持滚动缓冲区?【英文标题】:Does reactive extensions support rolling buffers?反应式扩展是否支持滚动缓冲区? 【发布时间】:2011-11-27 16:57:30 【问题描述】:我正在使用响应式扩展将数据整理到 100 毫秒的缓冲区中:
this.subscription = this.dataService
.Where(x => !string.Equals("FOO", x.Key.Source))
.Buffer(TimeSpan.FromMilliseconds(100))
.ObserveOn(this.dispatcherService)
.Where(x => x.Count != 0)
.Subscribe(this.OnBufferReceived);
这很好用。但是,我想要的行为与 Buffer
操作提供的行为略有不同。本质上,如果收到另一个数据项,我想重置计时器。只有当整个 100 毫秒都没有收到数据时,我才想处理它。这开启了 从不 处理数据的可能性,因此我也应该能够指定最大计数。我会想象一些类似的东西:
.SlidingBuffer(TimeSpan.FromMilliseconds(100), 10000)
我环顾四周,在 Rx 中找不到类似的东西?任何人都可以确认/否认这一点吗?
【问题讨论】:
我确信我在 Rx 的一个教程视频中看到了这种行为,但恐怕我不记得是什么或确切的位置。 :( 啊,节流阀 (msdn.microsoft.com/en-us/library/hh229298%28v=vs.103%29.aspx) 是我在想的,但我认为它本身并不能满足您的需求。不确定是否有某种方法可以将它结合起来做想要的事情...... 【参考方案1】:这可以通过结合Observable
的内置Window
和Throttle
方法来实现。首先,让我们解决一个忽略最大计数条件的简单问题:
public static IObservable<IList<T>> BufferUntilInactive<T>(this IObservable<T> stream, TimeSpan delay)
var closes = stream.Throttle(delay);
return stream.Window(() => closes).SelectMany(window => window.ToList());
强大的Window
method 完成了繁重的工作。现在很容易了解如何添加最大计数:
public static IObservable<IList<T>> BufferUntilInactive<T>(this IObservable<T> stream, TimeSpan delay, Int32? max=null)
var closes = stream.Throttle(delay);
if (max != null)
var overflows = stream.Where((x,index) => index+1>=max);
closes = closes.Merge(overflows);
return stream.Window(() => closes).SelectMany(window => window.ToList());
我会在我的博客上写一篇文章来解释这一点。 https://gist.github.com/2244036
Window 方法的文档:
http://leecampbell.blogspot.co.uk/2011/03/rx-part-9join-window-buffer-and-group.html http://enumeratethis.com/2011/07/26/financial-charts-reactive-extensions/【讨论】:
使用上述 BufferUntilInactive 场景 - 如果订阅者比生产者慢,您可能会看到下一组窗口项将被缓冲并且不会被推送到订阅者的场景,除非生成一个项... 我附上了一个样本snipt.org/Bhao0。在 Visual Studio (1) 打开输出窗口 (2) 检查挂起按钮 (3) 单击按钮 (4) 等待它在控制台上打印“立即单击”。 (5)按三下按钮,你会看到那三下都漏掉了。 这个解决方案,就目前而言,仅适用于热序列。为了使其也适用于冷序列,应添加Publish
组件,如here 所示。【参考方案2】:
我写了一个扩展来完成你所追求的大部分事情 - BufferWithInactivity
。
这里是:
public static IObservable<IEnumerable<T>> BufferWithInactivity<T>(
this IObservable<T> source,
TimeSpan inactivity,
int maximumBufferSize)
return Observable.Create<IEnumerable<T>>(o =>
var gate = new object();
var buffer = new List<T>();
var mutable = new SerialDisposable();
var subscription = (IDisposable)null;
var scheduler = Scheduler.ThreadPool;
Action dump = () =>
var bts = buffer.ToArray();
buffer = new List<T>();
if (o != null)
o.OnNext(bts);
;
Action dispose = () =>
if (subscription != null)
subscription.Dispose();
mutable.Dispose();
;
Action<Action<IObserver<IEnumerable<T>>>> onErrorOrCompleted =
onAction =>
lock (gate)
dispose();
dump();
if (o != null)
onAction(o);
;
Action<Exception> onError = ex =>
onErrorOrCompleted(x => x.OnError(ex));
Action onCompleted = () => onErrorOrCompleted(x => x.OnCompleted());
Action<T> onNext = t =>
lock (gate)
buffer.Add(t);
if (buffer.Count == maximumBufferSize)
dump();
mutable.Disposable = Disposable.Empty;
else
mutable.Disposable = scheduler.Schedule(inactivity, () =>
lock (gate)
dump();
);
;
subscription =
source
.ObserveOn(scheduler)
.Subscribe(onNext, onError, onCompleted);
return () =>
lock (gate)
o = null;
dispose();
;
);
【讨论】:
+1 谢谢。你写这个是为了这个问题还是为了你自己?是否已在生产代码中使用? @KentBoogaart - 我几个月前写的,但它还没有在生产代码中。它仍然是一个 WIP。【参考方案3】:使用 Rx Extensions 2.0,您可以通过接受超时和大小的新缓冲区重载来满足这两个要求:
this.subscription = this.dataService
.Where(x => !string.Equals("FOO", x.Key.Source))
.Buffer(TimeSpan.FromMilliseconds(100), 1)
.ObserveOn(this.dispatcherService)
.Where(x => x.Count != 0)
.Subscribe(this.OnBufferReceived);
有关文档,请参阅 https://msdn.microsoft.com/en-us/library/hh229200(v=vs.103).aspx。
【讨论】:
但这不会有滑动窗口,具有所要求的那种“去抖动”行为? @Cocowalla 我重新阅读了原始问题,我提供的代码确实满足了所有要求。我已在生产代码中使用它并取得了巨大成功。 抱歉,我的意思是去抖动行为:“如果收到另一个数据项,我想重置计时器” - 我没有看到您的代码这样做? AFAICS,您的代码将始终每 100 毫秒将缓冲区推送给订阅者(只要它不为空) 我现在明白你所说的去抖动是什么意思了,我认为我对这个词的理解更像是reactivex.io/documentation/operators/debounce.html,也就是Observable.Throttle
。你问的更复杂,但我想可以用Observable.Window
来完成。无论如何,除非我遗漏了什么,否则我的答案与该问题上接受的答案完全相同。
不,这个答案的行为与接受的答案不同。正确接受的答案(根据要求)在源 observable 持续活动的情况下推迟发送缓冲区。这个答案只是每 100 毫秒发出一次缓冲区。【参考方案4】:
我猜这可以在 Buffer 方法之上实现,如下所示:
public static IObservable<IList<T>> SlidingBuffer<T>(this IObservable<T> obs, TimeSpan span, int max)
return Observable.CreateWithDisposable<IList<T>>(cl =>
var acc = new List<T>();
return obs.Buffer(span)
.Subscribe(next =>
if (next.Count == 0) //no activity in time span
cl.OnNext(acc);
acc.Clear();
else
acc.AddRange(next);
if (acc.Count >= max) //max items collected
cl.OnNext(acc);
acc.Clear();
, err => cl.OnError(err), () => cl.OnNext(acc); cl.OnCompleted(); );
);
注意:我还没有测试过,但我希望它能给你一些想法。
【讨论】:
【参考方案5】:Panic 上校的solution 几乎是完美的。唯一缺少的是 Publish
组件,以便使解决方案也适用于冷序列。
/// <summary>
/// Projects each element of an observable sequence into a buffer that's sent out
/// when either a given inactivity timespan has elapsed, or it's full,
/// using the specified scheduler to run timers.
/// </summary>
public static IObservable<IList<T>> BufferUntilInactive<T>(
this IObservable<T> source, TimeSpan dueTime, int maxCount,
IScheduler scheduler = default)
if (maxCount < 1) throw new ArgumentOutOfRangeException(nameof(maxCount));
scheduler ??= Scheduler.Default;
return source.Publish(published =>
var combinedBoundaries = Observable.Merge
(
published.Throttle(dueTime, scheduler),
published.Skip(maxCount - 1)
);
return published
.Window(() => combinedBoundaries)
.SelectMany(window => window.ToList());
);
除了添加Publish
,我还将原来的.Where((_, index) => index + 1 >= maxCount)
替换为等效但更短的.Skip(maxCount - 1)
。为了完整起见,还有一个IScheduler
参数,用于配置运行定时器的调度程序。
【讨论】:
对于不包含maxCount
参数的更简单的BufferUntilInactive
变体,您可以查看here。以上是关于反应式扩展是不是支持滚动缓冲区?的主要内容,如果未能解决你的问题,请参考以下文章