为啥重复 Enumerable 到 Observable 转换块

Posted

技术标签:

【中文标题】为啥重复 Enumerable 到 Observable 转换块【英文标题】:Why does repeated Enumerable to Observable conversion block为什么重复 Enumerable 到 Observable 转换块 【发布时间】:2020-07-17 08:52:56 【问题描述】:

出于好奇,这是一个颇具教育意义的问题。考虑以下 sn-p:

var enumerable = Enumerable.Range(0, 5);
var observable = enumerable.ToObservable();
var enu = observable.Concat(observable).ToEnumerable();
enu.ToObservable().SubscribeDebug();

SubscribeDebug 订阅了一个简单的观察者:

public class DebugObserver<T> : IObserver<T>

    public void OnCompleted()
    
        Debug.WriteLine("Completed");
    

    public void OnError(Exception error)
    
        Debug.WriteLine("Error");
    

    public void OnNext(T value)
    
        Debug.WriteLine("Value: 0", value);
    

这个的输出是:

值:0

值:1

值:2

值:3

值:4

然后阻塞。有人可以帮我理解它发生的根本原因以及为什么 observable 没有完成吗?我注意到它在没有 Concat 调用的情况下完成,但会阻塞。

【问题讨论】:

当你连接另一个已经完成的 observable 时,这种行为是否也存在? 由于使用了调度程序,您的代码正在创建死锁。试试这个:.ToObservable(Scheduler.Default)。这适用于您的代码。我需要花更多的时间来告诉你原因。 @Progman - 你走错了路。对enumerable.ToObservable() 的每个订阅都会再次启动可枚举。喜欢foreach 调用一个可枚举再次启动可枚举。这里的问题是Scheduler.Immediate调度器造成的死锁。 问题似乎不在于 Scheduler.Immediate,因为当我将它传递给 ToObservable() 时,两个枚举都被迭代了。然而,当在没有任何调度程序实现的情况下调用时,代码会阻塞。 @OguzOzgul 它只与来自所有静态调度程序的Scheduler.CurrentThread 发生死锁。所以这是我猜的默认值(当ToObservable 被调用时没有参数)。 【参考方案1】:

我查看了the source 的ToObservable 并提炼了一个最小的实现。它确实重现了我们所看到的行为。

    public static IObservable<T> ToObservableEx<T>(this IEnumerable<T> enumerable) =>
        ToObservableEx(enumerable, CurrentThreadScheduler.Instance);

    public static IObservable<T> ToObservableEx<T>(this IEnumerable<T> enumerable, IScheduler scheduler) =>
        Observable.Create<T>
        (
            observer =>
            
                IDisposable loopRec(IScheduler inner, IEnumerator<T> enumerator)
                
                    if (enumerator.MoveNext()) 
                    
                        observer.OnNext(enumerator.Current);
                        inner.Schedule(enumerator, loopRec); //<-- culprit
                    
                    else
                    
                        observer.OnCompleted();
                    

                    // ToObservable.cs Line 117
                    // We never allow the scheduled work to be cancelled. 
                    return Disposable.Empty;
                

                return scheduler.Schedule(enumerable.GetEnumerator(), loopRec);
            
        );

除此之外 - 问题的症结在于CurrentThreadScheduler 的行为,这是使用的默认调度程序。

CurrentThreadScheduler 的行为是,如果在调用Schedule 时调度已经在运行 - 它最终会被排队。

        CurrentThreadScheduler.Instance.Schedule(() =>
        
            CurrentThreadScheduler.Instance.Schedule(() =>
                Console.WriteLine(1)
            );

            Console.WriteLine(2);
        );

这将打印2 1。这种排队行为是我们的失败。

observer.OnCompleted() 被调用时,它会导致Concat 开始下一个枚举 - 然而,事情与我们开始时不同 - 当我们尝试安排下一个时,我们仍然在 observer =&gt; 块内一。因此,不是立即执行,而是下一个计划排队。

现在enumerator.MoveNext() 陷入死锁。 它无法移动到下一个项目 - MoveNext 一直阻塞,直到下一个项目到达 - 只有在 ToObservable 循环安排时才能到达。

但是调度程序只能通知ToEnumerable 和随后被阻止的MoveNext() - 一旦它退出loopRec - 它不能因为它首先被MoveNext 阻止。

附录

这大约是ToEnumerable(来自GetEnumerator.cs)所做的(不是有效的实现):

    public static IEnumerable<T> ToEnumerableEx<T>(this IObservable<T> observable)
    
        var gate = new SemaphoreSlim(0);
        var queue = new ConcurrentQueue<T>();

        using(observable.Subscribe(
            value =>  queue.Enqueue(value); gate.Release(); , 
            () => gate.Release()))
        while (true)
        
            gate.Wait(); //this is where it blocks                

            if (queue.TryDequeue(out var current))
                yield return current;
            else
                break;
        
    

Enumerables 预计会阻塞,直到产生下一个项目 - 这就是为什么有一个门控实现。阻塞的不是Enumerable.Range,而是ToEnumerable

【讨论】:

但是我实现了一个自定义 IEnumerable 并返回了一个自定义 IEnumerator 我看到的是,当第一个枚举数的迭代完成时,再次调用 GetEnumerator() 并且(a我返回了新的),但永远不会调用 MoveNext()。 我应该澄清一下——重要的不是你自己的IEnumerable——而是Observable.ToEnumerable() 返回的那个。这就是它阻塞的地方。 @TheodorZoulias 谢谢!一旦我实现了ToObservable,问题就变得更加清晰了——Rx 堆栈跟踪相当不可读。编写简化的 Rx 操作符以了解其行为是一种很好的学习体验。 :) 难以置信!谢谢你这么详细的回答! 绝妙的答案!

以上是关于为啥重复 Enumerable 到 Observable 转换块的主要内容,如果未能解决你的问题,请参考以下文章

扩展 Enumerable.Range [重复]

为啥 Enumerable 在 Ruby 中没有长度属性?

为啥 Enumerable.Range 实现 IDisposable?

为啥 Enumerable 不实现 IEnumerable?

为啥 Enumerable 中的方法返回 Enumerator?

为啥 Enumerable#detect 需要 Proc/lambda?