控制台应用程序中 Observable 的 LastAsync() 死锁

Posted

技术标签:

【中文标题】控制台应用程序中 Observable 的 LastAsync() 死锁【英文标题】:Observable's LastAsync() deadlocks in console app 【发布时间】:2021-12-14 16:21:24 【问题描述】:

当使用IObservable.LastAsync() 强制我的控制台应用程序等待使用 Flurl 调用 API 的结果时,该 API 调用永远不会进行,主线程死锁并且永远不会从 LastAsync() 返回。我的目标是:

    由于这是一个控制台应用程序,我不能真正“订阅”API 调用,因为这将允许主线程继续,可能导致它在 API 调用完成之前退出。所以我需要阻塞,直到获取到值。 API 调用应该推迟到第一个订阅者请求一个值。 第二个和后续订阅者不应导致另一个 API 调用,而是应返回流中的最后一个值(这是使用 Replay(1) 的目标)

这是一个重现问题的示例:

public static class Program

    public static async Task Main(string[] args)
    
        var obs = Observable.Defer(() =>
                "https://api.publicapis.org"
                    .AppendPathSegment("entries")
                    .GetJsonAsync()
                    .ToObservable())
            .Select(x => x.title)
            .Replay(1);

        var title = await obs.LastAsync();
        Console.WriteLine($"Title 1: title");
    

如何修改我的示例以确保满足上述所有 3 个要求?为什么我的示例会导致死锁?

【问题讨论】:

【参考方案1】:

Replay 返回 "connectable" observable,你需要调用 Connect() 方法来启动它。如果没有那个调用,它就不会订阅底层的 observable,也不会向自己的订阅者发送项目,所以这就是你有“死锁”的原因。

在这种情况下,您可以使用RefCount() 扩展方法代替手动连接,该方法将在第一个订阅者上自动连接,并在最后一个订阅者退订时断开连接。所以:

public static async Task Main(string[] args) 
    var obs = Observable.Defer(() =>
            "https://api.publicapis.org"
                .AppendPathSegment("entries")
                .GetJsonAsync()
                .ToObservable())
        .Select(x => x.count)
        .Replay(1)
        .RefCount();

    // makes request
    var title = await obs.LastAsync();
    Console.WriteLine($"Title 1: title");
    // does not make request, obtains from replay cache
    title = await obs.LastAsync();
    Console.WriteLine($"Title 2: title");

你也可以使用AutoConnect方法:

.Replay(1)
.AutoConnect(1);

这将在第一个订阅者上自动连接,但永远不会断开连接(在你的情况下应该无关紧要)。

【讨论】:

啊!我现在觉得很傻,完全忽略了Replay()的返回类型。但我喜欢你的回答,尤其是你展示了不同的解决方案这一事实。这是一次很棒的学习经历。

以上是关于控制台应用程序中 Observable 的 LastAsync() 死锁的主要内容,如果未能解决你的问题,请参考以下文章

在 redux-observable 中,我如何控制 reducer 或 epics 是不是首先对动作做出反应?

在 Observable 中插入 F# SQL

Observable 只发出一个值?

RxSwift:多次连接到 Connectable Observable

访问 Observable 数组的 _latestValue 元素

Rx 操作符五