Reactive Throttle 返回在 TimeSpan 内添加的所有项目
Posted
技术标签:
【中文标题】Reactive Throttle 返回在 TimeSpan 内添加的所有项目【英文标题】:Reactive Throttle returning all items added within the TimeSpan 【发布时间】:2012-02-09 14:40:39 【问题描述】:给定IObservable<T>
,有没有办法使用Throttle
行为(添加项目时重置计时器,但让它返回该时间内添加的所有项目的集合?
Buffer
提供了类似的功能,它在每个时间跨度或计数上将数据分块到IList<T>
。但是每次添加项目时我都需要时间来重置。
我在这里看到了一个类似的问题,Does reactive extensions support rolling buffers?,但答案似乎并不理想,而且它有点旧,所以我想知道 Rx-Main 的发行版现在是否支持该功能。
【问题讨论】:
这听起来像我在***.com/a/7604825/259769 中的BufferWithInactivity
答案是您所要求的。你能澄清你的问题吗?
@Enigmativity 是的,这正是我所追求的功能。我在我的问题中引用了这个问题 :) 但我不喜欢那个答案,回答者明确表示它正在进行中。
不确定你在问什么。如果每次“添加”(传播?)项目时计时器都会重置,那么首先如何缓冲任何内容?
【参考方案1】:
正如我answered in the other post,是的,你可以!使用Observable
的Throttle
和Window
方法:
public static IObservable<IList<T>> BufferUntilInactive<T>(this IObservable<T> stream, TimeSpan delay)
var closes = stream.Throttle(delay);
return stream.Window(() => closes).SelectMany(window => window.ToList());
【讨论】:
不错的答案!但是不应该使用return stream.Publish(hot =>...
) 发布流,以避免两次订阅冷可观察对象吗?【参考方案2】:
我修改了上校 Panic 的 BufferUntilInactive
运算符,添加了一个 Publish
组件,这样它也可以与冷可观察对象一起正常工作:
/// <summary>Projects each element of an observable sequence into consecutive
/// non-overlapping buffers, which are produced based on time and activity,
/// using the specified scheduler to run timers.</summary>
public static IObservable<IList<T>> BufferUntilInactive<T>(
this IObservable<T> source, TimeSpan dueTime, IScheduler scheduler = default)
scheduler ??= Scheduler.Default;
return source.Publish(published =>
published
.Window(() => published.Throttle(dueTime, scheduler))
.SelectMany(window => window.ToList())
);
为了完整起见,我还添加了一个可选的IScheduler
参数,用于配置运行计时器的调度程序。
【讨论】:
对于同样具有maxCount
参数的BufferUntilInactive
变体,您可以查看here。【参考方案3】:
它不适合
Observable.BufferWithTimeOrCount<TSource> Method (IObservable<TSource>, TimeSpan, Int32)
?
【讨论】:
不,时间组件将立即启动并缓冲所有内容,直到该时间过去,并且计数将缓冲 n 个项目。添加项目时,我需要重置缓冲时间(例如 Throttle)。以上是关于Reactive Throttle 返回在 TimeSpan 内添加的所有项目的主要内容,如果未能解决你的问题,请参考以下文章
Laravel最佳实践--API请求频率限制(Throttle中间件)