如何实现一个“更好的”Finally Rx 操作符?
Posted
技术标签:
【中文标题】如何实现一个“更好的”Finally Rx 操作符?【英文标题】:How to implement a "better" Finally Rx operator? 【发布时间】:2021-12-17 07:22:07 【问题描述】:最近我意识到 Rx Finally
运算符的行为方式至少对我来说是出乎意料的。我的期望是finallyAction
抛出的任何错误都会传播到下游的操作员观察者。唉,这不是发生的事情。在现实中,操作符 first 将先行序列的完成(或失败)传播给它的观察者,然后 then 调用action
,在某个时间点不可能传播该操作引发的潜在错误。所以它会在ThreadPool
上抛出错误,并使进程崩溃。这不仅出乎意料,而且问题很大。以下是此行为的最小演示:
Observable
.Timer(TimeSpan.FromMilliseconds(100))
.Finally(() => throw new ApplicationException("Oops!"))
.Subscribe(_ => , ex => Console.WriteLine(ex.Message),
() => Console.WriteLine("Completed"));
Thread.Sleep(1000);
结果:未处理的异常 (Fiddle)
Finally
lambda 引发的异常不会由 Subscribe
:onError
处理程序处理,因为它是可取的。
这个功能(我很想称之为缺陷)严重限制了Finally
运算符在我眼中的实用性。本质上,我只能在我想调用一个预计永远不会失败的操作时使用它,如果它失败,则表明应用程序状态的灾难性损坏,而此时无法恢复。例如,我可以将它用于Release
和SemaphoreSlim
(例如我已经完成here),如果我的代码有错误,它只会失败。在这种情况下,我的应用程序崩溃了。但我也使用它recently 来调用调用者提供的未知操作,该操作可能会失败,并且在这种情况下使应用程序崩溃是不可接受的。相反,错误应该传播到下游。所以我在这里要问的是如何实现具有相同签名的Finally
变体(我们称之为FinallySafe
),以及下面指定的行为:
public static IObservable<TSource> FinallySafe<TSource>(
this IObservable<TSource> source, Action finallyAction);
-
应在
source
序列发出OnCompleted
或OnError
通知之后调用finallyAction
,但之前此通知传播到观察者。
如果finallyAction
调用成功完成,原始的OnCompleted
/OnError
通知应该传播给观察者。
如果finallyAction
调用失败,则应将OnError
通知传播给观察者,其中包含刚刚发生的错误。在这种情况下,应该忽略(不传播)之前的错误,即可能导致 source
完成失败的错误。
finallyAction
在source
完成之前取消订阅时,也应该调用finallyAction
。当订阅者(观察者)处理订阅时,finallyAction
应该被同步调用,并且任何错误都应该传播给Dispose
方法的调用者。
如果FinallySafe
被多个观察者订阅,则finallyAction
应该在每个订阅中调用一次,对于每个订阅者来说都是独立的,遵循上述规则。并发调用没问题。
finallyAction
不应为每个订阅者调用一次以上。
验证:将上面代码 sn-p 中的Finally
替换为FinallySafe
,应该会导致程序不会因未处理的异常而崩溃。
替代方案:我也愿意接受一个可以合理解释为什么内置 Finally
运算符的行为比自定义 FinallySafe
运算符的行为更好的答案,如上所述。
【问题讨论】:
我不确定,但是否应该在异常后不调用 finally 块?诸如“读取文件并最终关闭它”之类的东西。这是 finally-block 内部的一个异常,不是使用它的方式。 @akop 这是一个很好的观点。在 Rx 世界中,尽管链接Catch
和 Finally
运算符是任何顺序都是可能的。这不是语言语法强加的固定顺序,就像在 C# 中一样。
“我的期望是finallyAction
抛出的任何错误都会传播到下游的操作员观察者。” - 这不应该是期望,因为 Rx 的合同只能有一个 OnError
或 OnCompleted
- 所以在序列完成后它不能有另一个错误。
“调用一个预期永远不会失败的动作”——这不只是编写健壮代码的一个例子吗?
@Enigmativity 编写防故障方法当然不是编写健壮代码的必要条件。否则,一半的内置 .NET API、记录为抛出 ArgumentException
s、InvalidOperationException
s、OperationCanceledException
s 等的 API 将被视为非稳健 API。期望每个方法都能处理其调用期间可能发生的所有错误是不现实的。
【参考方案1】:
Finally
在序列结束后被调用,并且由于 Rx 合约只允许一个 OnError
或 OnCompleted
它不能发出第二个。
但是,如果您将Finally
替换为Do
,您可以获得您想要的行为。
试试这个代码:
Observable
.Timer(TimeSpan.FromMilliseconds(100))
.Do(_ => , () => throw new ApplicationException("Oops!"))
.Subscribe
(_ => ,
ex => Console.WriteLine(ex.Message),
() => Console.WriteLine("Completed"));
Thread.Sleep(TimeSpan.FromMilliseconds(1000));
按照您的预期运行。
我得到这个输出:
Oops!
如果你想在取消订阅时运行一些东西,那么使用这个扩展方法:
public static class Ext
public static IObservable<T> Unsubscribed<T>(this IObservable<T> source, Action unsubscribed) =>
Observable.Create<T>(o =>
new CompositeDisposable(source.Subscribe(o), Disposable.Create(unsubscribed)));
这是一个使用示例:
var source = Observable.Never<int>();
var subscription =
source
.Unsubscribed(() => Console.WriteLine("Unsubscribed"))
.Subscribe();
subscription.Dispose();
输出:
Unsubscribed
【讨论】:
感谢 Enigmativity 的回答。Do
运算符确实非常接近我正在寻找的内容。我希望这个运算符有一个接受onCompletedOrOnError
动作的重载,这样当我对source
是成功完成还是出现错误不感兴趣时,我可以将完成后的动作放在一个地方。不过我不能接受你的回答,因为它没有涵盖在取消订阅源时调用finallyAction
的要求。
@TheodorZoulias - 我用Unsubscribed
扩展方法更新了我的答案。
似乎Unsubscribed
运算符与Finally
运算符具有相同的行为。见this小提琴。就像Finally
一样,它会导致进程崩溃,这正是我试图避免的行为。
@TheodorZoulias - Unsubscribed
在 Rx 合约之外,所以你必须编写健壮的代码。
Enigmativity 我正在调用未知(用户提供的)代码,所以我不能保证它是防故障的,除非我将它包装在一个大的错误吞咽 try-catch 块中。这是我想避免的。因此我要求FinallySafe
运营商。【参考方案2】:
我阅读了文档,现在我确定了。 finally
-operator 将在完成后调用,不应抛出任何异常。
与非反应式编程相比:
StreamReader file = new StreamReader("file.txt");
string ln;
try
while ((ln = file.ReadLine()) != null)
Console.WriteLine(ln);
finally
// avoid to throw an exception inside of finally!
if (file != null)
file.close();
重要的是不要在finally
内抛出异常。
这是一个如何正确使用它的示例(fiddle):
using System;
using System.Reactive.Linq;
using System.Threading;
public class Program
public static void Main()
Observable
.Range(1,5) // simulates stream-reader
.Finally(() => Console.WriteLine("Close streamreader"))
.Do(i =>
if (i == 5)
throw new ApplicationException("Oops!"); // simulates IO-error
Console.WriteLine("Read " + i);
)
.Subscribe(_ => , ex => Console.WriteLine(ex.Message),
() => Console.WriteLine("Completed"));
Thread.Sleep(1000);
我不确定您要做什么(而且我对 c# 反应式还很陌生),但我认为您使用的运算符不正确。
编辑
但如果你愿意,你可以修补它。在这篇文章中,他们做了一些熟悉的事情。http://introtorx.com/Content/v1.0.10621.0/11_AdvancedErrorHandling.html
【讨论】:
akop 当然我使用了错误的运算符,因为我得到的行为(应用程序崩溃)不是我想要的行为(错误传播)。你知道哪个是正确的运营商吗?因为我没有。因此,我希望发明一个操作符,FinallySafe
,它的行为完全符合我的要求。
我也不知道合适的。在我的答案中链接的文章中,他们编写了一个新的 finally 运算符。也许这对你有帮助,为自己写这篇文章。
akop 好...我知道!这个问题的重点是为我提供所请求运算符的工作实现。我知道这是可以做到的。实际上,我手头已经有一个粗略的实现,我打算在测试所有极端情况后立即发布。
啊,我明白了。所以你想要我们的实施?我有一个建议:打开一个 Github 项目并将其链接到一个自己的答案中(带有一个小结论)。我会尽力在 Github 上帮助你。我对 RxJS 了解很多,但是 Rx.Net 对我来说有点新。
不,我将其发布在这里作为答案。大约 20 行代码,没什么特别的。【参考方案3】:
这是FinallySafe
运算符的实现,具有问题中指定的行为:
/// <summary>
/// Invokes a specified action after the source observable sequence terminates
/// successfully or exceptionally. The action is invoked before the propagation
/// of the source's completion, and any exception thrown by the action is
/// propagated to the observer. The action is also invoked if the observer
/// is unsubscribed before the termination of the source sequence.
/// </summary>
public static IObservable<T> FinallySafe<T>(this IObservable<T> source,
Action finallyAction)
return Observable.Create<T>(observer =>
var finallyOnce = Disposable.Create(finallyAction);
var subscription = source.Subscribe(observer.OnNext, error =>
try finallyOnce.Dispose();
catch (Exception ex) observer.OnError(ex); return;
observer.OnError(error);
, () =>
try finallyOnce.Dispose();
catch (Exception ex) observer.OnError(ex); return;
observer.OnCompleted();
);
return new CompositeDisposable(subscription, finallyOnce);
);
finallyAction
被分配为Disposable.Create
一次性实例的Dispose
操作,以确保该操作最多被调用一次。然后,通过使用 CompositeDisposable
实例将此一次性订阅与 source
的一次性订阅相结合。
作为旁注,我想解决这个问题,如果我们可以更进一步,在取消订阅期间向下游传播finallyAction
的可能错误。在某些情况下,这可能是可取的,但不幸的是,这是不可能的。首先,这样做会违反The Observable Contract 文档中的准则,该准则指出:
当观察者向 Observable 发出取消订阅通知时,Observable 将尝试停止向观察者发出通知。但是,不能保证 Observable 在观察者发出取消订阅通知后不会向观察者发出任何通知。
所以这样的实现是不合规的。更糟糕的是,Observable.Create
方法通过在订阅处理后立即静音observer
来强制执行此准则。它通过将观察者封装在 AutoDetachObserver
包装器中来实现。即使我们试图通过从头开始实现IObservable<T>
类型来规避这个限制,任何可以在我们的不合格Finally
运算符之后附加的内置运算符都会使我们的退订后OnError
通知静音。所以这是不可能的。退订期间的错误无法传播给刚刚请求退订的订阅者。
【讨论】:
以上是关于如何实现一个“更好的”Finally Rx 操作符?的主要内容,如果未能解决你的问题,请参考以下文章
如何实现模块与单片机的RX和TX连接并让它与PC机连接,就是实现TX和RX的分时复用功能