为啥重复 Enumerable 到 Observable 转换块
Posted
技术标签:
【中文标题】为啥重复 Enumerable 到 Observable 转换块【英文标题】:Why does repeated Enumerable to Observable conversion block为什么重复 Enumerable 到 Observable 转换块 【发布时间】:2020-07-17 08:52:56 【问题描述】:出于好奇,这是一个颇具教育意义的问题。考虑以下 sn-p:
var enumerable = Enumerable.Range(0, 5);
var observable = enumerable.ToObservable();
var enu = observable.Concat(observable).ToEnumerable();
enu.ToObservable().SubscribeDebug();
SubscribeDebug
订阅了一个简单的观察者:
public class DebugObserver<T> : IObserver<T>
public void OnCompleted()
Debug.WriteLine("Completed");
public void OnError(Exception error)
Debug.WriteLine("Error");
public void OnNext(T value)
Debug.WriteLine("Value: 0", value);
这个的输出是:
值:0
值:1
值:2
值:3
值:4
然后阻塞。有人可以帮我理解它发生的根本原因以及为什么 observable 没有完成吗?我注意到它在没有 Concat
调用的情况下完成,但会阻塞。
【问题讨论】:
当你连接另一个已经完成的 observable 时,这种行为是否也存在? 由于使用了调度程序,您的代码正在创建死锁。试试这个:.ToObservable(Scheduler.Default)
。这适用于您的代码。我需要花更多的时间来告诉你原因。
@Progman - 你走错了路。对enumerable.ToObservable()
的每个订阅都会再次启动可枚举。喜欢foreach
调用一个可枚举再次启动可枚举。这里的问题是Scheduler.Immediate
调度器造成的死锁。
问题似乎不在于 Scheduler.Immediate,因为当我将它传递给 ToObservable() 时,两个枚举都被迭代了。然而,当在没有任何调度程序实现的情况下调用时,代码会阻塞。
@OguzOzgul 它只与来自所有静态调度程序的Scheduler.CurrentThread
发生死锁。所以这是我猜的默认值(当ToObservable
被调用时没有参数)。
【参考方案1】:
我查看了the source 的ToObservable
并提炼了一个最小的实现。它确实重现了我们所看到的行为。
public static IObservable<T> ToObservableEx<T>(this IEnumerable<T> enumerable) =>
ToObservableEx(enumerable, CurrentThreadScheduler.Instance);
public static IObservable<T> ToObservableEx<T>(this IEnumerable<T> enumerable, IScheduler scheduler) =>
Observable.Create<T>
(
observer =>
IDisposable loopRec(IScheduler inner, IEnumerator<T> enumerator)
if (enumerator.MoveNext())
observer.OnNext(enumerator.Current);
inner.Schedule(enumerator, loopRec); //<-- culprit
else
observer.OnCompleted();
// ToObservable.cs Line 117
// We never allow the scheduled work to be cancelled.
return Disposable.Empty;
return scheduler.Schedule(enumerable.GetEnumerator(), loopRec);
);
除此之外 - 问题的症结在于CurrentThreadScheduler
的行为,这是使用的默认调度程序。
CurrentThreadScheduler
的行为是,如果在调用Schedule
时调度已经在运行 - 它最终会被排队。
CurrentThreadScheduler.Instance.Schedule(() =>
CurrentThreadScheduler.Instance.Schedule(() =>
Console.WriteLine(1)
);
Console.WriteLine(2);
);
这将打印2 1
。这种排队行为是我们的失败。
当observer.OnCompleted()
被调用时,它会导致Concat
开始下一个枚举 - 然而,事情与我们开始时不同 - 当我们尝试安排下一个时,我们仍然在 observer =>
块内一。因此,不是立即执行,而是下一个计划排队。
现在enumerator.MoveNext()
陷入死锁。
它无法移动到下一个项目 - MoveNext
一直阻塞,直到下一个项目到达 - 只有在 ToObservable
循环安排时才能到达。
但是调度程序只能通知ToEnumerable
和随后被阻止的MoveNext()
- 一旦它退出loopRec
- 它不能因为它首先被MoveNext
阻止。
附录
这大约是ToEnumerable
(来自GetEnumerator.cs)所做的(不是有效的实现):
public static IEnumerable<T> ToEnumerableEx<T>(this IObservable<T> observable)
var gate = new SemaphoreSlim(0);
var queue = new ConcurrentQueue<T>();
using(observable.Subscribe(
value => queue.Enqueue(value); gate.Release(); ,
() => gate.Release()))
while (true)
gate.Wait(); //this is where it blocks
if (queue.TryDequeue(out var current))
yield return current;
else
break;
Enumerables 预计会阻塞,直到产生下一个项目 - 这就是为什么有一个门控实现。阻塞的不是Enumerable.Range
,而是ToEnumerable
。
【讨论】:
但是我实现了一个自定义 IEnumerableIEnumerable
——而是Observable.ToEnumerable()
返回的那个。这就是它阻塞的地方。
@TheodorZoulias 谢谢!一旦我实现了ToObservable
,问题就变得更加清晰了——Rx 堆栈跟踪相当不可读。编写简化的 Rx 操作符以了解其行为是一种很好的学习体验。 :)
难以置信!谢谢你这么详细的回答!
绝妙的答案!以上是关于为啥重复 Enumerable 到 Observable 转换块的主要内容,如果未能解决你的问题,请参考以下文章
为啥 Enumerable.Range 实现 IDisposable?
为啥 Enumerable 不实现 IEnumerable?