System.Reactive - 一次处理未知数量的订阅

Posted

技术标签:

【中文标题】System.Reactive - 一次处理未知数量的订阅【英文标题】:System.Reactive - Disposing unknown amount of subscriptions at once 【发布时间】:2022-01-08 18:03:02 【问题描述】:

我有未知数量的订阅想要立即处理,因为它们可能会变得很多。有没有使用System.Reactive 一次性处理它们的机制?也许,将它们包装到 Observable.Using(() => Disposable.Create... 中会起作用吗?

client.Streams.PongStream.Subscribe(x =>
    Log.Information($"Pong received (x.Message)"));

client.Streams.FundingStream.Subscribe(response =>

    var funding = response.Data;
    Log.Information($"Funding: [funding.Symbol] rate:[funding.FundingRate] " +
                    $"mark price: funding.MarkPrice next funding: funding.NextFundingTime " +
                    $"index price funding.IndexPrice");
);

client.Streams.AggregateTradesStream.Subscribe(response =>

    var trade = response.Data;
    Log.Information($"Trade aggreg [trade.Symbol] [trade.Side] " +
                    $"price: trade.Price size: trade.Quantity");
);

client.Streams.TradesStream.Subscribe(response =>

    var trade = response.Data;
    Log.Information($"Trade normal [trade.Symbol] [trade.Side] " +
                    $"price: trade.Price size: trade.Quantity");
);

client.Streams.OrderBookPartialStream.Subscribe(response =>

    var ob = response.Data;
    Log.Information($"Order book snapshot [ob.Symbol] " +
                    $"bid: ob.Bids.FirstOrDefault()?.Price:F " +
                    $"ask: ob.Asks.FirstOrDefault()?.Price:F");
    Task.Delay(500).Wait();
    //OrderBookPartialResponse.StreamFakeSnapshot(response.Data, comm);
);

client.Streams.OrderBookDiffStream.Subscribe(response =>

    var ob = response.Data;
    Log.Information($"Order book diff [ob.Symbol] " +
                    $"bid: ob.Bids.FirstOrDefault()?.Price:F " +
                    $"ask: ob.Asks.FirstOrDefault()?.Price:F");
);

client.Streams.BookTickerStream.Subscribe(response =>

    var ob = response.Data;
    Log.Information($"Book ticker [ob.Symbol] " +
                    $"Best ask price: ob.BestAskPrice " +
                    $"Best ask qty: ob.BestAskQty " +
                    $"Best bid price: ob.BestBidPrice " +
                    $"Best bid qty: ob.BestBidQty");
);

client.Streams.KlineStream.Subscribe(response =>

    var ob = response.Data;
    Log.Information($"Kline [ob.Symbol] " +
                    $"Kline start time: ob.Data.StartTime " +
                    $"Kline close time: ob.Data.CloseTime " +
                    $"Interval: ob.Data.Interval " +
                    $"First trade ID: ob.Data.FirstTradeId " +
                    $"Last trade ID: ob.Data.LastTradeId " +
                    $"Open price: ob.Data.OpenPrice " +
                    $"Close price: ob.Data.ClosePrice " +
                    $"High price: ob.Data.HighPrice " +
                    $"Low price: ob.Data.LowPrice " +
                    $"Base asset volume: ob.Data.BaseAssetVolume " +
                    $"Number of trades: ob.Data.NumberTrades " +
                    $"Is this kline closed?: ob.Data.IsClose " +
                    $"Quote asset volume: ob.Data.QuoteAssetVolume " +
                    $"Taker buy base: ob.Data.TakerBuyBaseAssetVolume " +
                    $"Taker buy quote: ob.Data.TakerBuyQuoteAssetVolume " +
                    $"Ignore: ob.Data.Ignore ");
);

client.Streams.MiniTickerStream.Subscribe(response =>

    var ob = response.Data;
    Log.Information($"Mini-ticker [ob.Symbol] " +
                    $"Open price: ob.OpenPrice " +
                    $"Close price: ob.ClosePrice " +
                    $"High price: ob.HighPrice " +
                    $"Low price: ob.LowPrice " +
                    $"Base asset volume: ob.BaseAssetVolume " +
                    $"Quote asset volume: ob.QuoteAssetVolume");
);

这是这些订阅的实际情况。

public class BinanceClientStreams

    internal readonly Subject<PongResponse> PongSubject = new Subject<PongResponse>();

    internal readonly Subject<TradeResponse> TradesSubject = new Subject<TradeResponse>();
    internal readonly Subject<AggregatedTradeResponse> TradeBinSubject = new Subject<AggregatedTradeResponse>();

    internal readonly Subject<OrderBookPartialResponse> OrderBookPartialSubject =
        new Subject<OrderBookPartialResponse>();

    internal readonly Subject<OrderBookDiffResponse> OrderBookDiffSubject = new Subject<OrderBookDiffResponse>();
    internal readonly Subject<FundingResponse> FundingSubject = new Subject<FundingResponse>();

    internal readonly Subject<BookTickerResponse> BookTickerSubject = new Subject<BookTickerResponse>();
    
    internal readonly Subject<KlineResponse> KlineSubject = new Subject<KlineResponse>();
    
    internal readonly Subject<MiniTickerResponse> MiniTickerSubject = new Subject<MiniTickerResponse>();
    
    // PUBLIC

    /// <summary>
    /// Response stream to every ping request
    /// </summary>
    public IObservable<PongResponse> PongStream => PongSubject.AsObservable();

    /// <summary>
    /// Trades stream - emits every executed trade on Binance
    /// </summary>
    public IObservable<TradeResponse> TradesStream => TradesSubject.AsObservable();

    /// <summary>
    /// Chunk of trades - emits grouped trades together
    /// </summary>
    public IObservable<AggregatedTradeResponse> AggregateTradesStream => TradeBinSubject.AsObservable();

    /// <summary>
    /// Partial order book stream - emits small snapshot of the order book
    /// </summary>
    public IObservable<OrderBookPartialResponse> OrderBookPartialStream => OrderBookPartialSubject.AsObservable();

    /// <summary>
    /// Order book difference stream - emits small snapshot of the order book
    /// </summary>
    public IObservable<OrderBookDiffResponse> OrderBookDiffStream => OrderBookDiffSubject.AsObservable();

    /// <summary>
    /// Mark price and funding rate stream - emits mark price and funding rate for a single symbol pushed every 3 seconds or every second
    /// </summary>
    public IObservable<FundingResponse> FundingStream => FundingSubject.AsObservable();

    /// <summary>
    ///  The best bid or ask's price or quantity in real-time for a specified symbol
    /// </summary>
    public IObservable<BookTickerResponse> BookTickerStream => BookTickerSubject.AsObservable();

    /// <summary>
    /// The Kline/Candlestick subscription, provide symbol and chart intervals
    /// </summary>
    public IObservable<KlineResponse> KlineStream => KlineSubject.AsObservable();

    /// <summary>
    /// Mini-ticker specified symbol statistics for the previous 24hrs
    /// </summary>
    public IObservable<MiniTickerResponse> MiniTickerStream => MiniTickerSubject.AsObservable();

【问题讨论】:

【参考方案1】:

我认为您正在寻找的是CompositeDisposable。您需要创建该类的一个实例并将所有订阅添加到它。

var compDisp = new CompositeDisposable();

compDisp.Add(client.Streams.PongStream.Subscribe(x =>
    Log.Information($"Pong received (x.Message)")));

compDisp.Add(client.Streams.FundingStream.Subscribe(response =>

    var funding = response.Data;
    Log.Information($"Funding: [funding.Symbol] rate:[funding.FundingRate] " +
                    $"mark price: funding.MarkPrice next funding: funding.NextFundingTime " +
                    $"index price funding.IndexPrice");
));

compDisp.Add(client.Streams.AggregateTradesStream.Subscribe(response =>

    var trade = response.Data;
    Log.Information($"Trade aggreg [trade.Symbol] [trade.Side] " +
                    $"price: trade.Price size: trade.Quantity");
));

compDisp.Add(client.Streams.TradesStream.Subscribe(response =>

    var trade = response.Data;
    Log.Information($"Trade normal [trade.Symbol] [trade.Side] " +
                    $"price: trade.Price size: trade.Quantity");
));

compDisp.Add(client.Streams.OrderBookPartialStream.Subscribe(response =>

    var ob = response.Data;
    Log.Information($"Order book snapshot [ob.Symbol] " +
                    $"bid: ob.Bids.FirstOrDefault()?.Price:F " +
                    $"ask: ob.Asks.FirstOrDefault()?.Price:F");
    Task.Delay(500).Wait();
    //OrderBookPartialResponse.StreamFakeSnapshot(response.Data, comm);
));

compDisp.Add(client.Streams.OrderBookDiffStream.Subscribe(response =>

    var ob = response.Data;
    Log.Information($"Order book diff [ob.Symbol] " +
                    $"bid: ob.Bids.FirstOrDefault()?.Price:F " +
                    $"ask: ob.Asks.FirstOrDefault()?.Price:F");
));

compDisp.Add(client.Streams.BookTickerStream.Subscribe(response =>

    var ob = response.Data;
    Log.Information($"Book ticker [ob.Symbol] " +
                    $"Best ask price: ob.BestAskPrice " +
                    $"Best ask qty: ob.BestAskQty " +
                    $"Best bid price: ob.BestBidPrice " +
                    $"Best bid qty: ob.BestBidQty");
));

compDisp.Add(client.Streams.KlineStream.Subscribe(response =>

    var ob = response.Data;
    Log.Information($"Kline [ob.Symbol] " +
                    $"Kline start time: ob.Data.StartTime " +
                    $"Kline close time: ob.Data.CloseTime " +
                    $"Interval: ob.Data.Interval " +
                    $"First trade ID: ob.Data.FirstTradeId " +
                    $"Last trade ID: ob.Data.LastTradeId " +
                    $"Open price: ob.Data.OpenPrice " +
                    $"Close price: ob.Data.ClosePrice " +
                    $"High price: ob.Data.HighPrice " +
                    $"Low price: ob.Data.LowPrice " +
                    $"Base asset volume: ob.Data.BaseAssetVolume " +
                    $"Number of trades: ob.Data.NumberTrades " +
                    $"Is this kline closed?: ob.Data.IsClose " +
                    $"Quote asset volume: ob.Data.QuoteAssetVolume " +
                    $"Taker buy base: ob.Data.TakerBuyBaseAssetVolume " +
                    $"Taker buy quote: ob.Data.TakerBuyQuoteAssetVolume " +
                    $"Ignore: ob.Data.Ignore ");
));

compDisp.Add(client.Streams.MiniTickerStream.Subscribe(response =>

    var ob = response.Data;
    Log.Information($"Mini-ticker [ob.Symbol] " +
                    $"Open price: ob.OpenPrice " +
                    $"Close price: ob.ClosePrice " +
                    $"High price: ob.HighPrice " +
                    $"Low price: ob.LowPrice " +
                    $"Base asset volume: ob.BaseAssetVolume " +
                    $"Quote asset volume: ob.QuoteAssetVolume");
));

一旦compDisp 实例被释放,所有订阅都将被释放。当然,何时完成取决于您的应用程序的上下文。

编辑: 根据您的应用程序架构,WhenActivated 扩展方法也可能对您感兴趣。它在ActivatableViewActivatableViewModel 接口上定义,并接受每次激活视图(模型)时(即基本上在屏幕上显示时)调用的函数。此函数还有一个CompositeDisposable 作为参数,每次视图(模型)被停用时都会设置该参数。

编辑 2 刚刚意识到DiposeWith 方法实际上是ReactiveUI 框架以及WhenAcitvated 扩展方法的一部分,而不是该框架所基于的响应式扩展的一部分。所以你不能在不使用该框架的情况下编写像myObservable.Subscribe(x =&gt; ...).DisposeWith(compDisp) 这样的东西,但compDisp.Add(myObservable.Subscribe(x =&gt; ...)) 应该可以工作。我对上面的代码做了相应的调整。

【讨论】:

感谢您的回答!这是我根据您的回答所做的:controlc.com/4d3cb01d。你可以检查一下吗? 我不知道 CompositeDisposable 也可以这样构造(如前所述,我通常使用 ReactuveUI 的 DisposeWith 方法)。但是,我刚刚在我的应用程序中修改了一个视图模型类进行测试,它似乎按照您的方式工作。 非常感谢! :) 我在项目中添加了扩展方法,我目前正在使用.DisposeWith

以上是关于System.Reactive - 一次处理未知数量的订阅的主要内容,如果未能解决你的问题,请参考以下文章

使用 Reactive.NET RX 处理 Click 和 Double Click

Firebase:处理请求时出现未知错误。再试一次。 [关闭]

C++:如何解决在未知点引起的第一次机会异常?

问上海初一下学期的数学知识点,寒假准备稍微预习一下

数学-线性代数-#1 表示及解方程组的新视角

二元一次方程