如何实现一个“更好的”Finally Rx 操作符?

Posted

技术标签:

【中文标题】如何实现一个“更好的”Finally Rx 操作符?【英文标题】:How to implement a "better" Finally Rx operator? 【发布时间】:2021-12-17 07:22:07 【问题描述】:

最近我意识到 Rx Finally 运算符的行为方式至少对我来说是出乎意料的。我的期望是finallyAction 抛出的任何错误都会传播到下游的操作员观察者。唉,这不是发生的事情。在现实中,操作符 first 将先行序列的完成(或失败)传播给它的观察者,然后 then 调用action,在某个时间点不可能传播该操作引发的潜在错误。所以它会在ThreadPool 上抛出错误,并使进程崩溃。这不仅出乎意料,而且问题很大。以下是此行为的最小演示:

Observable
    .Timer(TimeSpan.FromMilliseconds(100))
    .Finally(() => throw new ApplicationException("Oops!"))
    .Subscribe(_ =>  , ex => Console.WriteLine(ex.Message),
        () => Console.WriteLine("Completed"));

Thread.Sleep(1000);

结果:未处理的异常 (Fiddle)

Finally lambda 引发的异常不会由 Subscribe:onError 处理程序处理,因为它是可取的。

这个功能(我很想称之为缺陷)严重限制了Finally 运算符在我眼中的实用性。本质上,我只能在我想调用一个预计永远不会失败的操作时使用它,如果它失败,则表明应用程序状态的灾难性损坏,而此时无法恢复。例如,我可以将它用于ReleaseSemaphoreSlim(例如我已经完成here),如果我的代码有错误,它只会失败。在这种情况下,我的应用程序崩溃了。但我也使用它recently 来调用调用者提供的未知操作,该操作可能会失败,并且在这种情况下使应用程序崩溃是不可接受的。相反,错误应该传播到下游。所以我在这里要问的是如何实现具有相同签名的Finally 变体(我们称之为FinallySafe),以及下面指定的行为:

public static IObservable<TSource> FinallySafe<TSource>(
    this IObservable<TSource> source, Action finallyAction);
    应在source 序列发出OnCompletedOnError 通知之后调用finallyAction,但之前此通知传播到观察者。 如果finallyAction 调用成功完成,原始的OnCompleted/OnError 通知应该传播给观察者。 如果finallyAction 调用失败,则应将OnError 通知传播给观察者,其中包含刚刚发生的错误。在这种情况下,应该忽略(不传播)之前的错误,即可能导致 source 完成失败的错误。 finallyActionsource 完成之前取消订阅时,也应该调用finallyAction。当订阅者(观察者)处理订阅时,finallyAction 应该被同步调用,并且任何错误都应该传播给Dispose 方法的调用者。 如果FinallySafe 被多个观察者订阅,则finallyAction 应该在每个订阅中调用一次,对于每个订阅者来说都是独立的,遵循上述规则。并发调用没问题。 finallyAction 不应为每个订阅者调用一次以上。

验证:将上面代码 sn-p 中的Finally 替换为FinallySafe,应该会导致程序不会因未处理的异常而崩溃。

替代方案:我也愿意接受一个可以合理解释为什么内置 Finally 运算符的行为比自定义 FinallySafe 运算符的行为更好的答案,如上所述。

【问题讨论】:

我不确定,但是否应该在异常后不调用 finally 块?诸如“读取文件并最终关闭它”之类的东西。这是 finally-block 内部的一个异常,不是使用它的方式。 @akop 这是一个很好的观点。在 Rx 世界中,尽管链接 CatchFinally 运算符是任何顺序都是可能的。这不是语言语法强加的固定顺序,就像在 C# 中一样。 “我的期望是finallyAction 抛出的任何错误都会传播到下游的操作员观察者。” - 这不应该是期望,因为 Rx 的合同只能有一个 OnErrorOnCompleted - 所以在序列完成后它不能有另一个错误。 “调用一个预期永远不会失败的动作”——这不只是编写健壮代码的一个例子吗? @Enigmativity 编写防故障方法当然不是编写健壮代码的必要条件。否则,一半的内置 .NET API、记录为抛出 ArgumentExceptions、InvalidOperationExceptions、OperationCanceledExceptions 等的 API 将被视为非稳健 API。期望每个方法都能处理其调用期间可能发生的所有错误是不现实的。 【参考方案1】:

Finally 在序列结束后被调用,并且由于 Rx 合约只允许一个 OnErrorOnCompleted 它不能发出第二个。

但是,如果您将Finally 替换为Do,您可以获得您想要的行为。

试试这个代码:

Observable
    .Timer(TimeSpan.FromMilliseconds(100))
    .Do(_ =>  , () => throw new ApplicationException("Oops!"))
    .Subscribe
        (_ =>  ,
        ex => Console.WriteLine(ex.Message),
        () => Console.WriteLine("Completed"));

Thread.Sleep(TimeSpan.FromMilliseconds(1000));

按照您的预期运行。

我得到这个输出:

Oops!

如果你想在取消订阅时运行一些东西,那么使用这个扩展方法:

public static class Ext

    public static IObservable<T> Unsubscribed<T>(this IObservable<T> source, Action unsubscribed) =>
        Observable.Create<T>(o =>
            new CompositeDisposable(source.Subscribe(o), Disposable.Create(unsubscribed)));

这是一个使用示例:

var source = Observable.Never<int>();

var subscription =
    source
        .Unsubscribed(() => Console.WriteLine("Unsubscribed"))
        .Subscribe();

subscription.Dispose();

输出:

Unsubscribed

【讨论】:

感谢 Enigmativity 的回答。 Do 运算符确实非常接近我正在寻找的内容。我希望这个运算符有一个接受onCompletedOrOnError 动作的重载,这样当我对source 是成功完成还是出现错误不感兴趣时​​,我可以将完成后的动作放在一个地方。不过我不能接受你的回答,因为它没有涵盖在取消订阅源时调用finallyAction 的要求。 @TheodorZoulias - 我用Unsubscribed 扩展方法更新了我的答案。 似乎Unsubscribed 运算符与Finally 运算符具有相同的行为。见this小提琴。就像Finally 一样,它会导致进程崩溃,这正是我试图避免的行为。 @TheodorZoulias - Unsubscribed 在 Rx 合约之外,所以你必须编写健壮的代码。 Enigmativity 我正在调用未知(用户提供的)代码,所以我不能保证它是防故障的,除非我将它包装在一个大的错误吞咽 try-catch 块中。这是我想避免的。因此我要求FinallySafe 运营商。【参考方案2】:

我阅读了文档,现在我确定了。 finally-operator 将在完成后调用,不应抛出任何异常。

与非反应式编程相比:

StreamReader file = new StreamReader("file.txt");
string ln;  

try   
   while ((ln = file.ReadLine()) != null)   
      Console.WriteLine(ln);
   

finally 
   // avoid to throw an exception inside of finally!
   if (file != null) 
      file.close();
   

重要的是不要在finally 内抛出异常。

这是一个如何正确使用它的示例(fiddle):

using System;
using System.Reactive.Linq;
using System.Threading;

public class Program

    public static void Main()
    
        Observable
            .Range(1,5) // simulates stream-reader
            .Finally(() => Console.WriteLine("Close streamreader"))
            .Do(i => 
                if (i == 5) 
                    throw new ApplicationException("Oops!"); // simulates IO-error
                
                
                Console.WriteLine("Read " + i);
            )
            .Subscribe(_ =>  , ex => Console.WriteLine(ex.Message),
                () => Console.WriteLine("Completed"));

        Thread.Sleep(1000);
    

我不确定您要做什么(而且我对 c# 反应式还很陌生),但我认为您使用的运算符不正确。

编辑

但如果你愿意,你可以修补它。在这篇文章中,他们做了一些熟悉的事情。http://introtorx.com/Content/v1.0.10621.0/11_AdvancedErrorHandling.html

【讨论】:

akop 当然我使用了错误的运算符,因为我得到的行为(应用程序崩溃)不是我想要的行为(错误传播)。你知道哪个是正确的运营商吗?因为我没有。因此,我希望发明一个操作符,FinallySafe,它的行为完全符合我的要求。 我也不知道合适的。在我的答案中链接的文章中,他们编写了一个新的 finally 运算符。也许这对你有帮助,为自己写这篇文章。 akop 好...我知道!这个问题的重点是为我提供所请求运算符的工作实现。我知道这是可以做到的。实际上,我手头已经有一个粗略的实现,我打算在测试所有极端情况后立即发布。 啊,我明白了。所以你想要我们的实施?我有一个建议:打开一个 Github 项目并将其链接到一个自己的答案中(带有一个小结论)。我会尽力在 Github 上帮助你。我对 RxJS 了解很多,但是 Rx.Net 对我来说有点新。 不,我将其发布在这里作为答案。大约 20 行代码,没什么特别的。【参考方案3】:

这是FinallySafe 运算符的实现,具有问题中指定的行为:

/// <summary>
/// Invokes a specified action after the source observable sequence terminates
/// successfully or exceptionally. The action is invoked before the propagation
/// of the source's completion, and any exception thrown by the action is
/// propagated to the observer. The action is also invoked if the observer
/// is unsubscribed before the termination of the source sequence.
/// </summary>
public static IObservable<T> FinallySafe<T>(this IObservable<T> source,
    Action finallyAction)

    return Observable.Create<T>(observer =>
    
        var finallyOnce = Disposable.Create(finallyAction);
        var subscription = source.Subscribe(observer.OnNext, error =>
        
            try  finallyOnce.Dispose(); 
            catch (Exception ex)  observer.OnError(ex); return; 
            observer.OnError(error);
        , () =>
        
            try  finallyOnce.Dispose(); 
            catch (Exception ex)  observer.OnError(ex); return; 
            observer.OnCompleted();
        );
        return new CompositeDisposable(subscription, finallyOnce);
    );

finallyAction 被分配为Disposable.Create 一次性实例的Dispose 操作,以确保该操作最多被调用一次。然后,通过使用 CompositeDisposable 实例将此一次性订阅与 source 的一次性订阅相结合。

作为旁注,我想解决这个问题,如果我们可以更进一步,在取消订阅期间向下游传播finallyAction 的可能错误。在某些情况下,这可能是可取的,但不幸的是,这是不可能的。首先,这样做会违反The Observable Contract 文档中的准则,该准则指出:

当观察者向 Observable 发出取消订阅通知时,Observable 将尝试停止向观察者发出通知。但是,不能保证 Observable 在观察者发出取消订阅通知后不会向观察者发出任何通知。

所以这样的实现是不合规的。更糟糕的是,Observable.Create 方法通过在订阅处理后立即静音observer 来强制执行此准则。它通过将观察者封装在 AutoDetachObserver 包装器中来实现。即使我们试图通过从头开始实现IObservable&lt;T&gt; 类型来规避这个限制,任何可以在我们的不合格Finally 运算符之后附加的内置运算符都会使我们的退订后OnError 通知静音。所以这是不可能的。退订期间的错误无法传播给刚刚请求退订的订阅者。

【讨论】:

以上是关于如何实现一个“更好的”Finally Rx 操作符?的主要内容,如果未能解决你的问题,请参考以下文章

Rx 中的游戏更新渲染循环:如何确保一致的状态?

java finally深入探究

Rx学习

如何实现模块与单片机的RX和TX连接并让它与PC机连接,就是实现TX和RX的分时复用功能

IVA电脑硬件团-推荐区讯景RX6900XT到手价格8399元,比RTX3080更好的选择!

如何解决错误:使用 RX java 在实现室中“不确定如何处理插入方法的返回类型”