Reactive.NET - 从 .Subscribe 返回通用对象

Posted

技术标签:

【中文标题】Reactive.NET - 从 .Subscribe 返回通用对象【英文标题】:Reactive.NET - returning generic object out of .Subscribe 【发布时间】:2022-01-05 15:00:20 【问题描述】:

有一个组合的网络套接字流wss://stream.binance.com:9443/stream?streams=bnbusdt@ticker/dogeusdt@depth5,我需要以下输出:

public IObservable<WebSocketPriceTicker24Hr> Tickers => ...;
public IObservable<WebSocketDepth> Depth => ...;

日志

Connection opened
Message: "stream":"dogeusdt@depth5","data":"lastUpdateId":3740272226,"bids":[["0.20140000","21189.00000000"],["0.20130000","275878.00000000"],["0.20120000","290900.00000000"],["0.20110000","313592.00000000"],["0.20100000","367368.00000000"]],"asks":[["0.20150000","109090.00000000"],["0.20160000","404515.00000000"],["0.20170000","649409.00000000"],["0.20180000","360650.00000000"],["0.20190000","185381.00000000"]]
Message: "stream":"bnbusdt@ticker","data":"e":"24hrTicker","E":1638097890123,"s":"BNBUSDT","p":"-2.50000000","P":"-0.416","w":"598.07225116","x":"601.40000000","c":"599.00000000","Q":"0.45200000","b":"599.00000000","B":"122.06600000","a":"599.10000000","A":"0.54000000","o":"601.50000000","h":"621.30000000","l":"572.40000000","v":"1286613.77200000","q":"769487994.99120000","O":1638011490067,"C":1638097890067,"F":471394573,"L":472263211,"n":868639
Message: "stream":"dogeusdt@depth5","data":"lastUpdateId":3740272244,"bids":[["0.20140000","21189.00000000"],["0.20130000","273472.00000000"],["0.20120000","262491.00000000"],["0.20110000","350795.00000000"],["0.20100000","362174.00000000"]],"asks":[["0.20150000","129653.00000000"],["0.20160000","411961.00000000"],["0.20170000","634098.00000000"],["0.20180000","360650.00000000"],["0.20190000","194995.00000000"]]
Message: "stream":"bnbusdt@ticker","data":"e":"24hrTicker","E":1638097891059,"s":"BNBUSDT","p":"-2.50000000","P":"-0.416","w":"598.07224947","x":"601.40000000","c":"599.00000000","Q":"0.28800000","b":"599.00000000","B":"116.83300000","a":"599.10000000","A":"35.25500000","o":"601.50000000","h":"621.30000000","l":"572.40000000","v":"1286614.33800000","q":"769488331.33030000","O":1638011491059,"C":1638097891059,"F":471394579,"L":472263222,"n":868644
Message: "stream":"dogeusdt@depth5","data":"lastUpdateId":3740272263,"bids":[["0.20140000","84255.00000000"],["0.20130000","263544.00000000"],["0.20120000","290699.00000000"],["0.20110000","322587.00000000"],["0.20100000","362174.00000000"]],"asks":[["0.20150000","128586.00000000"],["0.20160000","422245.00000000"],["0.20170000","629711.00000000"],["0.20180000","365383.00000000"],["0.20190000","194995.00000000"]]

简要说明

流正在返回消息,如前面的日志所示。我需要将结果反序列化为WebSocketResponse&lt;T&gt;,但这就是问题所在。我需要以某种方式拆分消息,否则我不知道,但结果是以下属性:IObservable&lt;WebSocketPriceTicker24Hr&gt;IObservable&lt;WebSocketDepth&gt;

public IObservable<string> Messages => Observable
    .FromEventPattern<MessageReceivedEventArgs>(h => _webSocket.MessageReceived += h,
        h => _webSocket.MessageReceived -= h)
    .Select(e => e.EventArgs.Message);

...

public IObservable<WebSocketPriceTicker24Hr> Tickers => ...;
public IObservable<WebSocketDepth> Depth => ...;

_eventSubscription = _webSocket.Messages
    .Select(m => // string
    
        Console.WriteLine($"Message: m");

        // TODO: What here?
        //JsonSerializer.Deserialize<WebSocketResponse<WebSocketPriceTicker24Hr>>(m) ?? throw new ArgumentException(m, nameof(m));
        //JsonSerializer.Deserialize<WebSocketResponse<WebSocketDepth>>(m) ?? throw new ArgumentException(m, nameof(m));

        return m;
    )
    .Subscribe((result) =>  // IObservable<string>
        // TODO: What here?
    );

...

// Models
public class WebSocketResponse<T>

    public string? Stream  get; set; 
    public T? Data  get; set; 


public class WebSocketPriceTicker24Hr

    [JsonPropertyName("e")] public string? EventType  get; set; 

    [JsonPropertyName("E")] public long EventTime  get; set; 

    [JsonPropertyName("s")] public string? Symbol  get; set; 

    [JsonPropertyName("p")] public decimal PriceChange  get; set; 

    [JsonPropertyName("P")] public decimal PriceChangePercent  get; set; 

    [JsonPropertyName("w")] public decimal WeightedAveragePrice  get; set; 

    [JsonPropertyName("x")] public decimal PreviousClosePrice  get; set; 

    [JsonPropertyName("c")] public decimal LastPrice  get; set; 

    [JsonPropertyName("Q")] public decimal LastQuantity  get; set; 

    [JsonPropertyName("b")] public decimal BestBidPrice  get; set; 

    [JsonPropertyName("B")] public decimal BestBidQuantity  get; set; 

    [JsonPropertyName("a")] public decimal BestAskPrice  get; set; 

    [JsonPropertyName("A")] public decimal BestAskQuantity  get; set; 

    [JsonPropertyName("o")] public decimal OpenPrice  get; set; 

    [JsonPropertyName("h")] public decimal HighPrice  get; set; 

    [JsonPropertyName("l")] public decimal LowPrice  get; set; 

    [JsonPropertyName("v")] public decimal TotalTradedBaseVolume  get; set; 

    [JsonPropertyName("q")] public decimal TotalTradedQuoteVolume  get; set; 

    [JsonPropertyName("O")] public long OpenTime  get; set; 

    [JsonPropertyName("C")] public long CloseTime  get; set; 

    [JsonPropertyName("F")] public long FirstTradeId  get; set; 

    [JsonPropertyName("L")] public long LastTradeId  get; set; 

    [JsonPropertyName("n")] public long Count  get; set; 


public class WebSocketDepth

    [JsonPropertyName("e")] public string? EventType  get; set; 

    [JsonPropertyName("E")] public long EventTime  get; set; 

    [JsonPropertyName("s")] public string? Symbol  get; set; 

    [JsonPropertyName("U")] public long FirstUpdateId  get; set; 

    [JsonPropertyName("u")] public long FinalUpdateId  get; set; 

    [JsonPropertyName("b")]
    public IEnumerable<IEnumerable<string>> Bids  get; set;  = Array.Empty<IEnumerable<string>>();

    [JsonPropertyName("a")]
    public IEnumerable<IEnumerable<string>> Asks  get; set;  = Array.Empty<IEnumerable<string>>();

【问题讨论】:

您到底有什么问题?您可以使用您的 cmets 中已经提到的 JsonSerializer.Deserialize()。您可以使用OfType&lt;&gt;() 过滤特定类型的对象。 @Progman 这就是问题所在。我该怎么做? 【参考方案1】:

您可以构建一个 IObservable&lt;object&gt; 类型的 observable,其中包含 WebSocketPriceTicker24HrWebSocketDepth 对象。之后,您使用OfType&lt;T&gt;() 构建特定类型的可观察对象。

IObservable<object> afterDeserialize = source.Select<string, object>(it => 
    var ticker = JsonSerializer.Deserialize<WebSocketResponse<WebSocketPriceTicker24Hr>>(it);
    var depth = JsonSerializer.Deserialize<WebSocketResponse<WebSocketDepth>>(it);
    if (ticker != null && ticker.stream == "bnbusdt@ticker") 
        return ticker.data;
    
    if (depth != null && depth.stream == "dogeusdt@depth5") 
        return depth.data;
    
    throw new InvalidOperationException("Could not deserialize the JSON to any object");
);

这是反序列化/提取 JSON 的“数据”部分的一种可能方法,但这些硬编码的 stream 检查很难看。当格式不匹配时,JsonSerializer.Deserialize() 调用将不会返回 null。您必须将反序列化过程调整为更通用/更健壮。但对于概念验证,这将返回 WebSocketPriceTicker24HrWebSocketDepth 对象。

现在我们可以在这个 observable 上使用OfType&lt;T&gt;()

IObservable<WebSocketDepth> onlyDepths = afterDeserialize
    .OfType<WebSocketDepth>();

IObservable<WebSocketPriceTicker24Hr> onlyTicker = afterDeserialize
    .OfType<WebSocketPriceTicker24Hr>();

您可以从那里订阅onlyDepthsonlyTicker observables。

【讨论】:

谢谢!所以它更有可能github.com/JuliaZaborowska/TPUM/blob/…

以上是关于Reactive.NET - 从 .Subscribe 返回通用对象的主要内容,如果未能解决你的问题,请参考以下文章

从多个中获取单个数组

尝试从智能按钮创建订阅时出错

从雪花中的 Postgres 复制 FILTER 子句

从 Angular 向 Asp.net 发送 post 请求时出现错误 405

在构建apk文件后,IonViewDidLoad不会从Ionic中的服务类加载数据

Paypal IPN 获取订阅结束日期(循环)