在 System.Reactive 的订阅中处理异常
Posted
技术标签:
【中文标题】在 System.Reactive 的订阅中处理异常【英文标题】:Handling exceptions in System.Reactive's Subscribe 【发布时间】:2022-01-09 14:58:34 【问题描述】:我想使用System.Reactive
显示以下所有订阅的例外情况。
下面一小段代码,你可以看到我试过了,但还是没有显示消息。
client.Streams.AggregateTradesStream
.Subscribe(response =>
throw new Exception("Asd");
Guard.Against.Null(response, nameof(response));
Guard.Against.Null(response.Data, nameof(response.Data),
"Something went wrong and the aggregated trade object is null");
var trade = response.Data;
Log.Information($"Aggregated trade [trade.Symbol] [trade.Side] " +
$"Price: trade.Price Size: trade.Quantity");
, ex => Console.WriteLine("Exception: 0 1", ex.Message, DateTime.Now))
.DisposeWith(disposable);
如何处理/显示所有.Subscribe
s 的异常,因为我所做的不起作用?也许,扩展方法可能会有所帮助。
完整代码
public class Program
private static readonly ManualResetEvent ExitEvent = new(false);
private static async Task Main()
Log.Logger = new LoggerConfiguration()
.MinimumLevel.Verbose()
.Enrich.FromLogContext()
.WriteTo.Console(LogEventLevel.Debug, theme: SystemConsoleTheme.Colored)
.WriteTo.File(Path.Combine("logs", "verbose.log"), rollingInterval: RollingInterval.Day)
.CreateLogger();
var disposable = new CompositeDisposable();
var uri = new Uri("wss://stream.binance.com:9443");
using var communicator = new BinanceWebSocketCommunicator(uri);
communicator.Name = "Binance-Spot";
communicator.ReconnectTimeout = TimeSpan.FromMinutes(10);
communicator.ReconnectionHappened
.Subscribe(info => Log.Information($"Reconnection happened, type: info.Type"))
.DisposeWith(disposable);
communicator.DisconnectionHappened
.Subscribe(info => Log.Information($"Disconnection happened, type: info.Type"))
.DisposeWith(disposable);
using var client = new BinanceWebSocketClient(communicator);
client.Streams.PongStream
.Subscribe(x => Log.Information($"Pong received (x.Message)"))
.DisposeWith(disposable);
client.Streams.AggregateTradesStream
.Subscribe(response =>
throw new Exception("Asd");
Guard.Against.Null(response, nameof(response));
Guard.Against.Null(response.Data, nameof(response.Data),
"Something went wrong and the aggregated trade object is null");
var trade = response.Data;
Log.Information($"Aggregated trade [trade.Symbol] [trade.Side] " +
$"Price: trade.Price Size: trade.Quantity");
, ex => Console.WriteLine("Exception: 0 1", ex.Message, DateTime.Now))
.DisposeWith(disposable);
client.Streams.KlineStream
.Subscribe(response =>
Guard.Against.Null(response, nameof(response));
Guard.Against.Null(response.Data, nameof(response.Data),
"Something went wrong and the kline object is null");
Guard.Against.Null(response.Data.Data, nameof(response.Data.Data),
"Something went wrong and the kline data object is null");
var kline = response.Data;
var klineData = response.Data.Data;
Log.Information($"Kline [kline.Symbol] " +
$"Kline start time: klineData.StartTime " +
$"Kline close time: klineData.CloseTime " +
$"Interval: klineData.Interval " +
$"First trade ID: klineData.FirstTradeId " +
$"Last trade ID: klineData.LastTradeId " +
$"Open price: klineData.OpenPrice " +
$"Close price: klineData.ClosePrice " +
$"High price: klineData.HighPrice " +
$"Low price: klineData.LowPrice " +
$"Base asset volume: klineData.BaseAssetVolume " +
$"Number of trades: klineData.NumberTrades " +
$"Is this kline closed?: klineData.IsClosed " +
$"Quote asset volume: klineData.QuoteAssetVolume " +
$"Taker buy base: klineData.TakerBuyBaseAssetVolume " +
$"Taker buy quote: klineData.TakerBuyQuoteAssetVolume " +
$"Ignore: klineData.Ignore ");
)
.DisposeWith(disposable);
client.AddSubscription(
new AggregateTradeSubscription("bnbusdt"),
new KlineSubscription("btcusdt", "1h"));
await communicator.Start().ConfigureAwait(false);
ExitEvent.WaitOne();
disposable.Dispose();
Log.CloseAndFlush();
【问题讨论】:
【参考方案1】:我意识到,为了在 .Subscribe
中显示异常,我必须将其包装在 .Select
/.SelectMany
或 try catch
块中。
client.Streams.AggregateTradesStream
.Subscribe(response =>
try
Guard.Against.Null(response, nameof(response));
Guard.Against.Null(response.Data, nameof(response.Data));
var trade = response.Data;
Log.Information($"Aggregated trade [trade.Symbol] [trade.Side] " +
$"Price: trade.Price Size: trade.Quantity");
catch (Exception ex)
logger.LogError(ex, "Exception while receiving message");
)
.DisposeWith(disposable);
【讨论】:
以上是关于在 System.Reactive 的订阅中处理异常的主要内容,如果未能解决你的问题,请参考以下文章
使用 Reactive.NET RX 处理 Click 和 Double Click