System.Reactive - 一次处理未知数量的订阅
Posted
技术标签:
【中文标题】System.Reactive - 一次处理未知数量的订阅【英文标题】:System.Reactive - Disposing unknown amount of subscriptions at once 【发布时间】:2022-01-08 18:03:02 【问题描述】:我有未知数量的订阅想要立即处理,因为它们可能会变得很多。有没有使用System.Reactive
一次性处理它们的机制?也许,将它们包装到 Observable.Using(() => Disposable.Create...
中会起作用吗?
client.Streams.PongStream.Subscribe(x =>
Log.Information($"Pong received (x.Message)"));
client.Streams.FundingStream.Subscribe(response =>
var funding = response.Data;
Log.Information($"Funding: [funding.Symbol] rate:[funding.FundingRate] " +
$"mark price: funding.MarkPrice next funding: funding.NextFundingTime " +
$"index price funding.IndexPrice");
);
client.Streams.AggregateTradesStream.Subscribe(response =>
var trade = response.Data;
Log.Information($"Trade aggreg [trade.Symbol] [trade.Side] " +
$"price: trade.Price size: trade.Quantity");
);
client.Streams.TradesStream.Subscribe(response =>
var trade = response.Data;
Log.Information($"Trade normal [trade.Symbol] [trade.Side] " +
$"price: trade.Price size: trade.Quantity");
);
client.Streams.OrderBookPartialStream.Subscribe(response =>
var ob = response.Data;
Log.Information($"Order book snapshot [ob.Symbol] " +
$"bid: ob.Bids.FirstOrDefault()?.Price:F " +
$"ask: ob.Asks.FirstOrDefault()?.Price:F");
Task.Delay(500).Wait();
//OrderBookPartialResponse.StreamFakeSnapshot(response.Data, comm);
);
client.Streams.OrderBookDiffStream.Subscribe(response =>
var ob = response.Data;
Log.Information($"Order book diff [ob.Symbol] " +
$"bid: ob.Bids.FirstOrDefault()?.Price:F " +
$"ask: ob.Asks.FirstOrDefault()?.Price:F");
);
client.Streams.BookTickerStream.Subscribe(response =>
var ob = response.Data;
Log.Information($"Book ticker [ob.Symbol] " +
$"Best ask price: ob.BestAskPrice " +
$"Best ask qty: ob.BestAskQty " +
$"Best bid price: ob.BestBidPrice " +
$"Best bid qty: ob.BestBidQty");
);
client.Streams.KlineStream.Subscribe(response =>
var ob = response.Data;
Log.Information($"Kline [ob.Symbol] " +
$"Kline start time: ob.Data.StartTime " +
$"Kline close time: ob.Data.CloseTime " +
$"Interval: ob.Data.Interval " +
$"First trade ID: ob.Data.FirstTradeId " +
$"Last trade ID: ob.Data.LastTradeId " +
$"Open price: ob.Data.OpenPrice " +
$"Close price: ob.Data.ClosePrice " +
$"High price: ob.Data.HighPrice " +
$"Low price: ob.Data.LowPrice " +
$"Base asset volume: ob.Data.BaseAssetVolume " +
$"Number of trades: ob.Data.NumberTrades " +
$"Is this kline closed?: ob.Data.IsClose " +
$"Quote asset volume: ob.Data.QuoteAssetVolume " +
$"Taker buy base: ob.Data.TakerBuyBaseAssetVolume " +
$"Taker buy quote: ob.Data.TakerBuyQuoteAssetVolume " +
$"Ignore: ob.Data.Ignore ");
);
client.Streams.MiniTickerStream.Subscribe(response =>
var ob = response.Data;
Log.Information($"Mini-ticker [ob.Symbol] " +
$"Open price: ob.OpenPrice " +
$"Close price: ob.ClosePrice " +
$"High price: ob.HighPrice " +
$"Low price: ob.LowPrice " +
$"Base asset volume: ob.BaseAssetVolume " +
$"Quote asset volume: ob.QuoteAssetVolume");
);
这是这些订阅的实际情况。
public class BinanceClientStreams
internal readonly Subject<PongResponse> PongSubject = new Subject<PongResponse>();
internal readonly Subject<TradeResponse> TradesSubject = new Subject<TradeResponse>();
internal readonly Subject<AggregatedTradeResponse> TradeBinSubject = new Subject<AggregatedTradeResponse>();
internal readonly Subject<OrderBookPartialResponse> OrderBookPartialSubject =
new Subject<OrderBookPartialResponse>();
internal readonly Subject<OrderBookDiffResponse> OrderBookDiffSubject = new Subject<OrderBookDiffResponse>();
internal readonly Subject<FundingResponse> FundingSubject = new Subject<FundingResponse>();
internal readonly Subject<BookTickerResponse> BookTickerSubject = new Subject<BookTickerResponse>();
internal readonly Subject<KlineResponse> KlineSubject = new Subject<KlineResponse>();
internal readonly Subject<MiniTickerResponse> MiniTickerSubject = new Subject<MiniTickerResponse>();
// PUBLIC
/// <summary>
/// Response stream to every ping request
/// </summary>
public IObservable<PongResponse> PongStream => PongSubject.AsObservable();
/// <summary>
/// Trades stream - emits every executed trade on Binance
/// </summary>
public IObservable<TradeResponse> TradesStream => TradesSubject.AsObservable();
/// <summary>
/// Chunk of trades - emits grouped trades together
/// </summary>
public IObservable<AggregatedTradeResponse> AggregateTradesStream => TradeBinSubject.AsObservable();
/// <summary>
/// Partial order book stream - emits small snapshot of the order book
/// </summary>
public IObservable<OrderBookPartialResponse> OrderBookPartialStream => OrderBookPartialSubject.AsObservable();
/// <summary>
/// Order book difference stream - emits small snapshot of the order book
/// </summary>
public IObservable<OrderBookDiffResponse> OrderBookDiffStream => OrderBookDiffSubject.AsObservable();
/// <summary>
/// Mark price and funding rate stream - emits mark price and funding rate for a single symbol pushed every 3 seconds or every second
/// </summary>
public IObservable<FundingResponse> FundingStream => FundingSubject.AsObservable();
/// <summary>
/// The best bid or ask's price or quantity in real-time for a specified symbol
/// </summary>
public IObservable<BookTickerResponse> BookTickerStream => BookTickerSubject.AsObservable();
/// <summary>
/// The Kline/Candlestick subscription, provide symbol and chart intervals
/// </summary>
public IObservable<KlineResponse> KlineStream => KlineSubject.AsObservable();
/// <summary>
/// Mini-ticker specified symbol statistics for the previous 24hrs
/// </summary>
public IObservable<MiniTickerResponse> MiniTickerStream => MiniTickerSubject.AsObservable();
【问题讨论】:
【参考方案1】:我认为您正在寻找的是CompositeDisposable
。您需要创建该类的一个实例并将所有订阅添加到它。
var compDisp = new CompositeDisposable();
compDisp.Add(client.Streams.PongStream.Subscribe(x =>
Log.Information($"Pong received (x.Message)")));
compDisp.Add(client.Streams.FundingStream.Subscribe(response =>
var funding = response.Data;
Log.Information($"Funding: [funding.Symbol] rate:[funding.FundingRate] " +
$"mark price: funding.MarkPrice next funding: funding.NextFundingTime " +
$"index price funding.IndexPrice");
));
compDisp.Add(client.Streams.AggregateTradesStream.Subscribe(response =>
var trade = response.Data;
Log.Information($"Trade aggreg [trade.Symbol] [trade.Side] " +
$"price: trade.Price size: trade.Quantity");
));
compDisp.Add(client.Streams.TradesStream.Subscribe(response =>
var trade = response.Data;
Log.Information($"Trade normal [trade.Symbol] [trade.Side] " +
$"price: trade.Price size: trade.Quantity");
));
compDisp.Add(client.Streams.OrderBookPartialStream.Subscribe(response =>
var ob = response.Data;
Log.Information($"Order book snapshot [ob.Symbol] " +
$"bid: ob.Bids.FirstOrDefault()?.Price:F " +
$"ask: ob.Asks.FirstOrDefault()?.Price:F");
Task.Delay(500).Wait();
//OrderBookPartialResponse.StreamFakeSnapshot(response.Data, comm);
));
compDisp.Add(client.Streams.OrderBookDiffStream.Subscribe(response =>
var ob = response.Data;
Log.Information($"Order book diff [ob.Symbol] " +
$"bid: ob.Bids.FirstOrDefault()?.Price:F " +
$"ask: ob.Asks.FirstOrDefault()?.Price:F");
));
compDisp.Add(client.Streams.BookTickerStream.Subscribe(response =>
var ob = response.Data;
Log.Information($"Book ticker [ob.Symbol] " +
$"Best ask price: ob.BestAskPrice " +
$"Best ask qty: ob.BestAskQty " +
$"Best bid price: ob.BestBidPrice " +
$"Best bid qty: ob.BestBidQty");
));
compDisp.Add(client.Streams.KlineStream.Subscribe(response =>
var ob = response.Data;
Log.Information($"Kline [ob.Symbol] " +
$"Kline start time: ob.Data.StartTime " +
$"Kline close time: ob.Data.CloseTime " +
$"Interval: ob.Data.Interval " +
$"First trade ID: ob.Data.FirstTradeId " +
$"Last trade ID: ob.Data.LastTradeId " +
$"Open price: ob.Data.OpenPrice " +
$"Close price: ob.Data.ClosePrice " +
$"High price: ob.Data.HighPrice " +
$"Low price: ob.Data.LowPrice " +
$"Base asset volume: ob.Data.BaseAssetVolume " +
$"Number of trades: ob.Data.NumberTrades " +
$"Is this kline closed?: ob.Data.IsClose " +
$"Quote asset volume: ob.Data.QuoteAssetVolume " +
$"Taker buy base: ob.Data.TakerBuyBaseAssetVolume " +
$"Taker buy quote: ob.Data.TakerBuyQuoteAssetVolume " +
$"Ignore: ob.Data.Ignore ");
));
compDisp.Add(client.Streams.MiniTickerStream.Subscribe(response =>
var ob = response.Data;
Log.Information($"Mini-ticker [ob.Symbol] " +
$"Open price: ob.OpenPrice " +
$"Close price: ob.ClosePrice " +
$"High price: ob.HighPrice " +
$"Low price: ob.LowPrice " +
$"Base asset volume: ob.BaseAssetVolume " +
$"Quote asset volume: ob.QuoteAssetVolume");
));
一旦compDisp
实例被释放,所有订阅都将被释放。当然,何时完成取决于您的应用程序的上下文。
编辑:
根据您的应用程序架构,WhenActivated
扩展方法也可能对您感兴趣。它在ActivatableView
和ActivatableViewModel
接口上定义,并接受每次激活视图(模型)时(即基本上在屏幕上显示时)调用的函数。此函数还有一个CompositeDisposable
作为参数,每次视图(模型)被停用时都会设置该参数。
编辑 2
刚刚意识到DiposeWith
方法实际上是ReactiveUI
框架以及WhenAcitvated
扩展方法的一部分,而不是该框架所基于的响应式扩展的一部分。所以你不能在不使用该框架的情况下编写像myObservable.Subscribe(x => ...).DisposeWith(compDisp)
这样的东西,但compDisp.Add(myObservable.Subscribe(x => ...))
应该可以工作。我对上面的代码做了相应的调整。
【讨论】:
感谢您的回答!这是我根据您的回答所做的:controlc.com/4d3cb01d。你可以检查一下吗? 我不知道 CompositeDisposable 也可以这样构造(如前所述,我通常使用 ReactuveUI 的 DisposeWith 方法)。但是,我刚刚在我的应用程序中修改了一个视图模型类进行测试,它似乎按照您的方式工作。 非常感谢! :) 我在项目中添加了扩展方法,我目前正在使用.DisposeWith
以上是关于System.Reactive - 一次处理未知数量的订阅的主要内容,如果未能解决你的问题,请参考以下文章
使用 Reactive.NET RX 处理 Click 和 Double Click