Rx Throttle(...).ObserveOn(scheduler) 和 Throttle(..., scheduler) 的区别

Posted

技术标签:

【中文标题】Rx Throttle(...).ObserveOn(scheduler) 和 Throttle(..., scheduler) 的区别【英文标题】:The difference between Rx Throttle(...).ObserveOn(scheduler) and Throttle(..., scheduler) 【发布时间】:2015-05-03 04:13:31 【问题描述】:

我有以下代码:

IDisposable subscription = myObservable.Throttle(TimeSpan.FromMilliseconds(50), RxApp.MainThreadScheduler)
                                       .Subscribe(_ => UpdateUi());

正如预期的那样,UpdateUi() 将始终在主线程上执行。当我将代码更改为

IDisposable subscription = myObservable.Throttle(TimeSpan.FromMilliseconds(50))
                                       .ObserveOn(RxApp.MainThreadScheduler)
                                       .Subscribe(_ => UpdateUi());

UpdateUI() 将在后台线程中执行。

为什么Throttle(...).ObserveOn(scheduler) 不等于Throttle(..., scheduler)

【问题讨论】:

【参考方案1】:

在你给UpdateUi 的代码中的两个示例中,总是在RxApp.MainThreadScheduler 指定的调度程序上调用。我可以肯定地说这一点,因为ObserveOn 是一个装饰器,可确保在指定的调度程序上调用订阅者的OnNext 处理程序。请参阅here 进行深入分析。

这么说,这有点令人费解。 RxApp.MainThreadScheduler 不是指正确的调度程序调度程序,或者UpdateUi 正在从调度程序线程转换。前者并非史无前例 - 请参阅 https://github.com/reactiveui/ReactiveUI/issues/768 其他人遇到过的情况。我不知道在那种情况下问题是什么。也许@PaulBetts 可以参与进来,或者您可以在https://github.com/reactiveui/ 提出问题。无论如何,我会在这里仔细检查您的假设,因为我希望这是一个经过良好测试的领域。你有完整的复制品吗?

至于你的具体问题,Throttle(...).ObserveOn(scheduler)Throttle(..., scheduler)的区别如下:

在第一种情况下,当 Throttle 指定时没有调度程序,它将使用默认平台调度程序来引入运行它的计时器所需的并发性 - 在 WPF 上,这将使用线程池线程。所以所有的限制都将在后台线程上完成,并且由于以下ObserveOn,释放的事件只会传递给指定调度程序上的订阅者。

Throttle 指定调度程序的情况下,在该调度程序上进行限制 - 抑制事件和释放事件都将在该调度程序上进行管理,并且订阅者也将在同一调度程序上调用。

因此,无论哪种方式,UpdateUi 都会在 RxApp.MainThreadScheduler 上被调用。

在大多数情况下,最好不要在调度程序上限制 ui 事件,因为如果只有一小部分事件会通过限制,在后台线程上运行单独的计时器并支付上下文切换的成本通常会更高。

所以,为了检查您没有遇到RxApp.MainThreadScheduler 的问题,我会尝试通过另一种方式明确指定调度程序或SynchronizationContext。如何做到这一点取决于您所在的平台 - ObserveOnDispatcher() 希望可用,或使用合适的 ObserveOn 重载。如果导入了正确的 Rx 库,则可以选择控件、同步上下文和调度程序。

【讨论】:

谢谢,原来我的MainThreadScheduler 似乎有问题(见***.com/a/28827962/167251) 我刚刚发现,there's a bit more about the difference between the two。【参考方案2】:

经过一番调查,我认为这是由于运行时使用的 Rx 版本与我预期的不同(我为第三方应用程序开发了一个插件)。

我不知道为什么,但似乎默认的RxApp.MainThreadScheduler 无法正确初始化。默认实例是WaitForDispatcherScheduler (source)。这个类的所有函数都依赖attemptToCreateScheduler:

    IScheduler attemptToCreateScheduler()
    
        if (_innerScheduler != null) return _innerScheduler;
        try 
            _innerScheduler = _schedulerFactory();
            return _innerScheduler;
         catch (Exception) 
            // NB: Dispatcher's not ready yet. Keep using CurrentThread
            return CurrentThreadScheduler.Instance;
        
    

在我的情况下似乎发生的是 _schedulerFactory() 抛出,导致 CurrentThreadScheduler.Instance 被返回。

通过手动将RxApp.MainThreadScheduler 初始化为new SynchronizationContextScheduler(SynchronizationContext.Current) 行为符合预期。

【讨论】:

【参考方案3】:

我刚刚遇到一个问题,首先导致我提出这个问题,然后进行了一些实验。

事实证明,Throttle(timeSpan, scheduler) 足够聪明地“取消”已经安排好的去抖事件X,以防源发出另一个事件Y 之前 X 被观察到。因此,最终只会观察到Y(假设它是最后一个去抖事件)。

使用Throttle(timeSpan).ObserveOn(scheduler)XY 都将被观察到。

因此,从概念上讲,这是两种方法之间的重要区别。遗憾的是,Rx.NET 文档很少,但我相信这种行为是设计使然,对我来说很有意义。

用一个例子来说明这一点 (fiddle):

#nullable enable
using System;
using System.Threading;
using System.Threading.Tasks;
using System.Diagnostics;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Reactive.Subjects;
using static System.Console;

public class Program

    static async Task ThrottleWithScheduler()
    
        WriteLine($"\nnameof(ThrottleWithScheduler)\n");

        var sc = new CustomSyncContext();
        var scheduler = new SynchronizationContextScheduler(sc);
        var subj = new BehaviorSubject<string>("A");

        subj
            .Do(v => WriteLine($"Emitted v on sc.Elapsedms"))
            .Throttle(TimeSpan.FromMilliseconds(500), scheduler)
            .Subscribe(v => WriteLine($"Observed v on sc.Elapsedms"));

        await Task.Delay(100);
        subj.OnNext("B");
        await Task.Delay(200);
        subj.OnNext("X");
        await Task.Delay(550);
        subj.OnNext("Y");

        await Task.Delay(2000);
        WriteLine("Finished!");
    

    static async Task ThrottleWithObserveOn()
    
        WriteLine($"\nnameof(ThrottleWithObserveOn)\n");

        var sc = new CustomSyncContext();
        var scheduler = new SynchronizationContextScheduler(sc);
        var subj = new BehaviorSubject<string>("A");

        subj
            .Do(v => WriteLine($"Emitted v on sc.Elapsedms"))
            .Throttle(TimeSpan.FromMilliseconds(500))
            .ObserveOn(scheduler)
            .Subscribe(v => WriteLine($"Observed v on sc.Elapsedms"));

        await Task.Delay(100);
        subj.OnNext("B");
        await Task.Delay(200);
        subj.OnNext("X");
        await Task.Delay(550);
        subj.OnNext("Y");

        await Task.Delay(2000);
        WriteLine("Finished!");
    

    public static async Task Main()
    
        await ThrottleWithScheduler();
        await ThrottleWithObserveOn();
    


class CustomSyncContext : SynchronizationContext

    private readonly Stopwatch _sw = Stopwatch.StartNew();
    public long Elapsed  get  lock (_sw)  return _sw.ElapsedMilliseconds;   
    public override void Post(SendOrPostCallback d, object? state)
    
        WriteLine($"Scheduled on Elapsedms");
        Task.Delay(100).ContinueWith(
            continuationAction: _ =>
            
                WriteLine($"Executed on Elapsedms");
                d(state);
            ,
            continuationOptions: TaskContinuationOptions.ExecuteSynchronously);
    

输出:

ThrottleWithScheduler

Emitted A on 18ms
Emitted B on 142ms
Emitted X on 351ms
Scheduled on 861ms
Emitted Y on 907ms
Executed on 972ms
Scheduled on 1421ms
Executed on 1536ms
Observed Y on 1539ms
Finished!

ThrottleWithObserveOn

Emitted A on 4ms
Emitted B on 113ms
Emitted X on 315ms
Scheduled on 837ms
Emitted Y on 886ms
Executed on 951ms
Observed X on 953ms
Scheduled on 1391ms
Executed on 1508ms
Observed Y on 1508ms
Finished!

【讨论】:

以上是关于Rx Throttle(...).ObserveOn(scheduler) 和 Throttle(..., scheduler) 的区别的主要内容,如果未能解决你的问题,请参考以下文章

lodash 的 防抖(debounce)和节流(throttle)

js 中的截流函数throttle 和 debounce

频率组件throttle

JS debounce和throttle 去抖和节流

js 消抖(debounce)与节流(throttle)

实现debounce和throttle函数