捕获订阅 OnNext 操作可能引发的异常

Posted

技术标签:

【中文标题】捕获订阅 OnNext 操作可能引发的异常【英文标题】:Catching exceptions which may be thrown from a Subscription OnNext Action 【发布时间】:2011-11-04 19:41:08 【问题描述】:

我对 Rx.NET 有点陌生。是否有可能捕获任何订阅者可能抛出的异常?采取以下...

handler.FooStream.Subscribe(
            _ => throw new Exception("Bar"),
            _ =>  );

目前,我正在以每个订阅为基础使用以下实例。其中的实现只是使用 ManualResetEvent 来唤醒等待线程。

public interface IExceptionCatcher

    Action<T> Exec<T>(Action<T> action);

并像这样使用它......

handler.FooStream.Subscribe(
            _exceptionCatcher.Exec<Foo>(_ => throw new Exception("Bar")), //It's disappointing that this generic type can't be inferred
            _ =>  );

我觉得一定有更好的方法。 Rx.NET 中的所有错误处理功能都是专门用于处理可观察到的错误吗?

编辑:根据请求,我的实现是https://gist.github.com/1409829(接口和实现在产品代码中分成不同的程序集)。欢迎反馈。这可能看起来很傻,但我正在使用城堡温莎来管理许多不同的 Rx 订阅者。这个异常捕获器像这样在容器中注册

windsorContainer.Register(Component.For<IExceptionCatcher>().Instance(catcher));

然后它将像这样使用,其中observable 是 IObservable 的实例:

var exceptionCatcher =
    new ExceptionCatcher(e =>
                                
                                    Logger.FatalException(
                                        "Exception caught, shutting down.", e);
                                    // Deal with unmanaged resources here
                                , false);


/* 
 * Normally the code below exists in some class managed by an IoC container.
 * 'catcher' would be provided by the container.
 */
observable /* do some filtering, selecting, grouping etc */
    .SubscribeWithExceptionCatching(processItems, catcher);

【问题讨论】:

【参考方案1】:

默认情况下,内置的 Observable 运算符不会执行您要求的操作(很像事件),但您可以创建一个扩展方法来执行此操作。

public static IObservable<T> IgnoreObserverExceptions<T, TException>(
                                this IObservable<T> source
                               ) where TException : Exception

    return Observable.Create<T>(
        o => source.Subscribe(
            v =>  try  o.OnNext(v); 
                   catch (TException)  
            ,
            ex => o.OnError(ex),
            () => o.OnCompleted()
            ));

那么任何 observable 都可以用这个方法包装得到你描述的行为。

【讨论】:

@drstevens 它将捕获来自同一线程的异常。如果您的观察者正在启动自己的引发异常的异步操作,则不会捕获这些。 好吧,考虑到有多少 Rx 操作会导致一个新线程(或池中的任务),我想说很可能就是这种情况。在handler.FooStreamSubscribe 之间有问题是GroupByUntil(...).SelectMany(...).Buffer(with a time)。实际上,我最终按照您的示例创建了一个SubscribeWithCatch,它捕获异常,然后使用传递给 OnError 处理程序的相同操作。 @Jmix90 用我的代码更新的原始问题。欢迎反馈。 我编辑了答案,因为较新的 Rx 包不再有 Observable.CreateWithDisposable 方法。相反,他们有一个重载的Observable.Create 方法。 相关:How to handle exceptions in OnNext when using ObserveOn?

以上是关于捕获订阅 OnNext 操作可能引发的异常的主要内容,如果未能解决你的问题,请参考以下文章

未捕获的类型错误:无法读取未定义 ONn 数据表的属性“样式”

rxswift 绑定(onNext:VS 订阅(onNext:

在 Ruby 中捕获异常后重新引发(相同的异常)

为啥我的 onNext 没有触发我的 rx 订阅?

Python编程时的捕获异常

捕获异步 void 方法引发的异常