控制台应用程序中 Observable 的 LastAsync() 死锁
Posted
技术标签:
【中文标题】控制台应用程序中 Observable 的 LastAsync() 死锁【英文标题】:Observable's LastAsync() deadlocks in console app 【发布时间】:2021-12-14 16:21:24 【问题描述】:当使用IObservable.LastAsync()
强制我的控制台应用程序等待使用 Flurl 调用 API 的结果时,该 API 调用永远不会进行,主线程死锁并且永远不会从 LastAsync()
返回。我的目标是:
-
由于这是一个控制台应用程序,我不能真正“订阅”API 调用,因为这将允许主线程继续,可能导致它在 API 调用完成之前退出。所以我需要阻塞,直到获取到值。
API 调用应该推迟到第一个订阅者请求一个值。
第二个和后续订阅者不应导致另一个 API 调用,而是应返回流中的最后一个值(这是使用
Replay(1)
的目标)
这是一个重现问题的示例:
public static class Program
public static async Task Main(string[] args)
var obs = Observable.Defer(() =>
"https://api.publicapis.org"
.AppendPathSegment("entries")
.GetJsonAsync()
.ToObservable())
.Select(x => x.title)
.Replay(1);
var title = await obs.LastAsync();
Console.WriteLine($"Title 1: title");
如何修改我的示例以确保满足上述所有 3 个要求?为什么我的示例会导致死锁?
【问题讨论】:
【参考方案1】:Replay
返回 "connectable" observable,你需要调用 Connect()
方法来启动它。如果没有那个调用,它就不会订阅底层的 observable,也不会向自己的订阅者发送项目,所以这就是你有“死锁”的原因。
在这种情况下,您可以使用RefCount()
扩展方法代替手动连接,该方法将在第一个订阅者上自动连接,并在最后一个订阅者退订时断开连接。所以:
public static async Task Main(string[] args)
var obs = Observable.Defer(() =>
"https://api.publicapis.org"
.AppendPathSegment("entries")
.GetJsonAsync()
.ToObservable())
.Select(x => x.count)
.Replay(1)
.RefCount();
// makes request
var title = await obs.LastAsync();
Console.WriteLine($"Title 1: title");
// does not make request, obtains from replay cache
title = await obs.LastAsync();
Console.WriteLine($"Title 2: title");
你也可以使用AutoConnect
方法:
.Replay(1)
.AutoConnect(1);
这将在第一个订阅者上自动连接,但永远不会断开连接(在你的情况下应该无关紧要)。
【讨论】:
啊!我现在觉得很傻,完全忽略了Replay()
的返回类型。但我喜欢你的回答,尤其是你展示了不同的解决方案这一事实。这是一次很棒的学习经历。以上是关于控制台应用程序中 Observable 的 LastAsync() 死锁的主要内容,如果未能解决你的问题,请参考以下文章
在 redux-observable 中,我如何控制 reducer 或 epics 是不是首先对动作做出反应?
RxSwift:多次连接到 Connectable Observable