调度程序:立即与 CurrentThread

Posted

技术标签:

【中文标题】调度程序:立即与 CurrentThread【英文标题】:Schedulers: Immediate vs. CurrentThread 【发布时间】:2015-09-09 23:45:26 【问题描述】:

在阅读explanation 后了解原因

Observable.Return(5)
  .Repeat()
  .Take(1)

永远不会完成,但是

Observable.Return(5, Scheduler.CurrentThread)
  .Repeat()
  .Take(1)

按预期工作。我仍然很困惑,我不知道为什么CurrentThread 实际上解决了这个问题。谁能给个明确的解释?

【问题讨论】:

我都试过了,它们对我来说都很好。你能发布重现问题的代码吗? 看到这个post。 @Enigmativity 第一个在 Linqpad 中为我锁定,虽然它确实打印了值 @NedStoyanov - 两者在 LINQPad 中对我来说都很好。你能告诉我你的测试代码是什么样的吗? @Enigmativity 使用 linqpad 4.55.03 和 Rx-Main 2.2.5。代码是这样的:void Main() Observable.Return(5).Repeat().Take(1).Subscribe(Console.WriteLine);。它打印 5 但底部的执行栏一直显示 【参考方案1】:

Ned Stoyanov 在上面的 cmets 中提供的 link 有 Dave Sexton 的精彩解释。

我会尝试用不同的方式来说明它。以 RecursiveMethod 中发生递归调用为例。

public class RecursiveTest()

    private bool _isDone;

    public void RecursiveMethod()
    
        if (!_isDone)
        
            RecursiveMethod();

           // Never gets here...
           _isDone = true;
        
      

您可以很容易地看到这将无限期地递归(直到出现 ***Exception),因为 _isDone 永远不会设置为 true。这是一个过于简化的示例,但基本上与您的第一个示例相同。

这是 Dave Sexton 的解释,用于描述您的第一个示例中发生的情况。

默认情况下,Return 使用 ImmediateScheduler 调用 OnNext(1) 然后 已完成()。重复不会引入任何并发,所以它看到 OnCompleted 立即,然后立即重新订阅 Return。 因为 Return 中没有蹦床,所以这种模式会重复, 无限期地阻塞当前线程。在此调用订阅 observable 永远不会返回。

换句话说,由于重入的无限循环,初始流程永远不会完全完成。所以我们需要一种方法来完成初始流程而无需重入。

让我们回到本文上面的 RecursiveTest 示例,避免无限递归的解决方案是什么?在再次执行 RecursiveMethod 之前,我们需要 RecursiveMethod 完成其流程。一种方法是创建一个队列并将对 RecursiveMethod 的调用排入队列,如下所示:

public void RecursiveMethod()

    if (!_isDone)
    
        Enqueue(RecursiveMethod);
        _isDone = true;
    
  

这样,初始流程将完成,_isDone 将设置为 true,并且当执行下一次对 RecursiveMethod 的调用时,将不再执行任何内容,从而避免无限递归。这几乎就是 Scheduler.CurrentThread 将对您的第二个示例执行的操作。

让我们看看 Dave Sexton 如何解释您的第二个示例的工作原理:

这里,Return 使用 CurrentTheadScheduler 调用 OnNext(1) 然后 已完成()。重复不会引入任何并发,所以它看到 OnCompleted 立即然后立即重新订阅 Return; 但是,第二次订阅 Return 会安排其(内部) 蹦床上的动作,因为它仍在 来自第一个计划(外部)操作的 OnCompleted 回调,因此 重复不会立即发生。这允许重复返回一个 一次性的 Take,最终调用 OnCompleted,取消 通过处理重复来重复,最终来自订阅的调用 返回。

再一次,我的示例确实被简化了,以便于理解,但这并不是它的工作原理。 Here you can see 调度程序是如何工作的。它使用他们所谓的 Trampoline,它基本上是一个确保没有可重入调用的队列。因此,所有调用都在同一线程上一个接一个地序列化。这样就可以完成初始流程,避免无限重入循环。

希望这更清楚:)

【讨论】:

感谢您的回答,它让我非常清楚。 链接已损坏:github.com/Reactive-Extensions/Rx.NET/blob/master/Rx.NET/Source/… 链接现已修复

以上是关于调度程序:立即与 CurrentThread的主要内容,如果未能解决你的问题,请参考以下文章

《Linux内核设计与实现》读书笔记- 进程的调度

JobService 的 onStartJob 方法在作业被调度后立即执行

ngrx 效果:由一个效果调度的动作是不是立即被其他效果处理?

强制 GcmTaskService 立即运行

调度后立即反应 Redux Store 值

进程调度算法时间片轮转调度算法多级反馈队列调度算法(Java实现)