Reactive Throttle 返回在 TimeSpan 内添加的所有项目

Posted

技术标签:

【中文标题】Reactive Throttle 返回在 TimeSpan 内添加的所有项目【英文标题】:Reactive Throttle returning all items added within the TimeSpan 【发布时间】:2012-02-09 14:40:39 【问题描述】:

给定IObservable<T>,有没有办法使用Throttle 行为(添加项目时重置计时器,但让它返回该时间内添加的所有项目的集合?

Buffer 提供了类似的功能,它在每个时间跨度或计数上将数据分块到IList<T>。但是每次添加项目时我都需要时间来重置。

我在这里看到了一个类似的问题,Does reactive extensions support rolling buffers?,但答案似乎并不理想,而且它有点旧,所以我想知道 Rx-Main 的发行版现在是否支持该功能。

【问题讨论】:

这听起来像我在***.com/a/7604825/259769 中的BufferWithInactivity 答案是您所要求的。你能澄清你的问题吗? @Enigmativity 是的,这正是我所追求的功能。我在我的问题中引用了这个问题 :) 但我不喜欢那个答案,回答者明确表示它正在进行中。 不确定你在问什么。如果每次“添加”(传播?)项目时计时器都会重置,那么首先如何缓冲任何内容? 【参考方案1】:

正如我answered in the other post,是的,你可以!使用ObservableThrottleWindow方法:

public static IObservable<IList<T>> BufferUntilInactive<T>(this IObservable<T> stream, TimeSpan delay)

    var closes = stream.Throttle(delay);
    return stream.Window(() => closes).SelectMany(window => window.ToList());

【讨论】:

不错的答案!但是不应该使用return stream.Publish(hot =&gt;...) 发布流,以避免两次订阅冷可观察对象吗?【参考方案2】:

我修改了上校 Panic 的 BufferUntilInactive 运算符,添加了一个 Publish 组件,这样它也可以与冷可观察对象一起正常工作:

/// <summary>Projects each element of an observable sequence into consecutive
/// non-overlapping buffers, which are produced based on time and activity,
/// using the specified scheduler to run timers.</summary>
public static IObservable<IList<T>> BufferUntilInactive<T>(
    this IObservable<T> source, TimeSpan dueTime, IScheduler scheduler = default)

    scheduler ??= Scheduler.Default;
    return source.Publish(published =>
        published
            .Window(() => published.Throttle(dueTime, scheduler))
            .SelectMany(window => window.ToList())
    );

为了完整起见,我还添加了一个可选的IScheduler 参数,用于配置运行计时器的调度程序。

【讨论】:

对于同样具有maxCount 参数的BufferUntilInactive 变体,您可以查看here。【参考方案3】:

它不适合

Observable.BufferWithTimeOrCount&lt;TSource&gt; Method (IObservable&lt;TSource&gt;, TimeSpan, Int32)?

【讨论】:

不,时间组件将立即启动并缓冲所有内容,直到该时间过去,并且计数将缓冲 n 个项目。添加项目时,我需要重置缓冲时间(例如 Throttle)。

以上是关于Reactive Throttle 返回在 TimeSpan 内添加的所有项目的主要内容,如果未能解决你的问题,请参考以下文章

Laravel最佳实践--API请求频率限制(Throttle中间件)

vue3.0 reactive函数

JS debounce和throttle 去抖和节流

Reactive.NET - 从 .Subscribe 返回通用对象

函数节流(throttle)与函数去抖(debounce)

js 消抖(debounce)与节流(throttle)