在 System.Reactive 的订阅中处理异常

Posted

技术标签:

【中文标题】在 System.Reactive 的订阅中处理异常【英文标题】:Handling exceptions in System.Reactive's Subscribe 【发布时间】:2022-01-09 14:58:34 【问题描述】:

我想使用System.Reactive 显示以下所有订阅的例外情况。

下面一小段代码,你可以看到我试过了,但还是没有显示消息。

client.Streams.AggregateTradesStream
    .Subscribe(response =>
    
        throw new Exception("Asd");

        Guard.Against.Null(response, nameof(response));
        Guard.Against.Null(response.Data, nameof(response.Data),
            "Something went wrong and the aggregated trade object is null");

        var trade = response.Data;
        Log.Information($"Aggregated trade [trade.Symbol] [trade.Side] " +
                        $"Price: trade.Price Size: trade.Quantity");
    , ex => Console.WriteLine("Exception: 0 1", ex.Message, DateTime.Now))
    .DisposeWith(disposable);

如何处理/显示所有.Subscribes 的异常,因为我所做的不起作用?也许,扩展方法可能会有所帮助。

完整代码

public class Program

    private static readonly ManualResetEvent ExitEvent = new(false);

    private static async Task Main()
    
        Log.Logger = new LoggerConfiguration()
            .MinimumLevel.Verbose()
            .Enrich.FromLogContext()
            .WriteTo.Console(LogEventLevel.Debug, theme: SystemConsoleTheme.Colored)
            .WriteTo.File(Path.Combine("logs", "verbose.log"), rollingInterval: RollingInterval.Day)
            .CreateLogger();

        var disposable = new CompositeDisposable();
        var uri = new Uri("wss://stream.binance.com:9443");

        using var communicator = new BinanceWebSocketCommunicator(uri);

        communicator.Name = "Binance-Spot";
        communicator.ReconnectTimeout = TimeSpan.FromMinutes(10);

        communicator.ReconnectionHappened
            .Subscribe(info => Log.Information($"Reconnection happened, type: info.Type"))
            .DisposeWith(disposable);

        communicator.DisconnectionHappened
            .Subscribe(info => Log.Information($"Disconnection happened, type: info.Type"))
            .DisposeWith(disposable);

        using var client = new BinanceWebSocketClient(communicator);

        client.Streams.PongStream
            .Subscribe(x => Log.Information($"Pong received (x.Message)"))
            .DisposeWith(disposable);

        client.Streams.AggregateTradesStream
            .Subscribe(response =>
            
                throw new Exception("Asd");

                Guard.Against.Null(response, nameof(response));
                Guard.Against.Null(response.Data, nameof(response.Data),
                    "Something went wrong and the aggregated trade object is null");

                var trade = response.Data;
                Log.Information($"Aggregated trade [trade.Symbol] [trade.Side] " +
                                $"Price: trade.Price Size: trade.Quantity");
            , ex => Console.WriteLine("Exception: 0 1", ex.Message, DateTime.Now))
            .DisposeWith(disposable);

        client.Streams.KlineStream
            .Subscribe(response =>
            
                Guard.Against.Null(response, nameof(response));
                Guard.Against.Null(response.Data, nameof(response.Data),
                    "Something went wrong and the kline object is null");
                Guard.Against.Null(response.Data.Data, nameof(response.Data.Data),
                    "Something went wrong and the kline data object is null");

                var kline = response.Data;
                var klineData = response.Data.Data;

                Log.Information($"Kline [kline.Symbol] " +
                                $"Kline start time: klineData.StartTime " +
                                $"Kline close time: klineData.CloseTime " +
                                $"Interval: klineData.Interval " +
                                $"First trade ID: klineData.FirstTradeId " +
                                $"Last trade ID: klineData.LastTradeId " +
                                $"Open price: klineData.OpenPrice " +
                                $"Close price: klineData.ClosePrice " +
                                $"High price: klineData.HighPrice " +
                                $"Low price: klineData.LowPrice " +
                                $"Base asset volume: klineData.BaseAssetVolume " +
                                $"Number of trades: klineData.NumberTrades " +
                                $"Is this kline closed?: klineData.IsClosed " +
                                $"Quote asset volume: klineData.QuoteAssetVolume " +
                                $"Taker buy base: klineData.TakerBuyBaseAssetVolume " +
                                $"Taker buy quote: klineData.TakerBuyQuoteAssetVolume " +
                                $"Ignore: klineData.Ignore ");
            )
            .DisposeWith(disposable);

        client.AddSubscription(
            new AggregateTradeSubscription("bnbusdt"),
            new KlineSubscription("btcusdt", "1h"));

        await communicator.Start().ConfigureAwait(false);

        ExitEvent.WaitOne();

        disposable.Dispose();

        Log.CloseAndFlush();
    

【问题讨论】:

【参考方案1】:

我意识到,为了在 .Subscribe 中显示异常,我必须将其包装在 .Select/.SelectManytry catch 块中。

client.Streams.AggregateTradesStream
    .Subscribe(response =>
    
        try
        
            Guard.Against.Null(response, nameof(response));
            Guard.Against.Null(response.Data, nameof(response.Data));

            var trade = response.Data;
            Log.Information($"Aggregated trade [trade.Symbol] [trade.Side] " +
                            $"Price: trade.Price Size: trade.Quantity");
        
        catch (Exception ex)
        
            logger.LogError(ex, "Exception while receiving message");
        
    )
    .DisposeWith(disposable);

【讨论】:

以上是关于在 System.Reactive 的订阅中处理异常的主要内容,如果未能解决你的问题,请参考以下文章

使用 Reactive.NET RX 处理 Click 和 Double Click

数据异构

canal+kafka订阅Mysql binlog将数据异构到elasticsearch(或其他存储方式)

再学Canal

再学Canal

中间件canal概述