Reactive.NET - 从 .Subscribe 返回通用对象
Posted
技术标签:
【中文标题】Reactive.NET - 从 .Subscribe 返回通用对象【英文标题】:Reactive.NET - returning generic object out of .Subscribe 【发布时间】:2022-01-05 15:00:20 【问题描述】:有一个组合的网络套接字流wss://stream.binance.com:9443/stream?streams=bnbusdt@ticker/dogeusdt@depth5,我需要以下输出:
public IObservable<WebSocketPriceTicker24Hr> Tickers => ...;
public IObservable<WebSocketDepth> Depth => ...;
日志
Connection opened
Message: "stream":"dogeusdt@depth5","data":"lastUpdateId":3740272226,"bids":[["0.20140000","21189.00000000"],["0.20130000","275878.00000000"],["0.20120000","290900.00000000"],["0.20110000","313592.00000000"],["0.20100000","367368.00000000"]],"asks":[["0.20150000","109090.00000000"],["0.20160000","404515.00000000"],["0.20170000","649409.00000000"],["0.20180000","360650.00000000"],["0.20190000","185381.00000000"]]
Message: "stream":"bnbusdt@ticker","data":"e":"24hrTicker","E":1638097890123,"s":"BNBUSDT","p":"-2.50000000","P":"-0.416","w":"598.07225116","x":"601.40000000","c":"599.00000000","Q":"0.45200000","b":"599.00000000","B":"122.06600000","a":"599.10000000","A":"0.54000000","o":"601.50000000","h":"621.30000000","l":"572.40000000","v":"1286613.77200000","q":"769487994.99120000","O":1638011490067,"C":1638097890067,"F":471394573,"L":472263211,"n":868639
Message: "stream":"dogeusdt@depth5","data":"lastUpdateId":3740272244,"bids":[["0.20140000","21189.00000000"],["0.20130000","273472.00000000"],["0.20120000","262491.00000000"],["0.20110000","350795.00000000"],["0.20100000","362174.00000000"]],"asks":[["0.20150000","129653.00000000"],["0.20160000","411961.00000000"],["0.20170000","634098.00000000"],["0.20180000","360650.00000000"],["0.20190000","194995.00000000"]]
Message: "stream":"bnbusdt@ticker","data":"e":"24hrTicker","E":1638097891059,"s":"BNBUSDT","p":"-2.50000000","P":"-0.416","w":"598.07224947","x":"601.40000000","c":"599.00000000","Q":"0.28800000","b":"599.00000000","B":"116.83300000","a":"599.10000000","A":"35.25500000","o":"601.50000000","h":"621.30000000","l":"572.40000000","v":"1286614.33800000","q":"769488331.33030000","O":1638011491059,"C":1638097891059,"F":471394579,"L":472263222,"n":868644
Message: "stream":"dogeusdt@depth5","data":"lastUpdateId":3740272263,"bids":[["0.20140000","84255.00000000"],["0.20130000","263544.00000000"],["0.20120000","290699.00000000"],["0.20110000","322587.00000000"],["0.20100000","362174.00000000"]],"asks":[["0.20150000","128586.00000000"],["0.20160000","422245.00000000"],["0.20170000","629711.00000000"],["0.20180000","365383.00000000"],["0.20190000","194995.00000000"]]
简要说明
流正在返回消息,如前面的日志所示。我需要将结果反序列化为WebSocketResponse<T>
,但这就是问题所在。我需要以某种方式拆分消息,否则我不知道,但结果是以下属性:IObservable<WebSocketPriceTicker24Hr>
和 IObservable<WebSocketDepth>
。
public IObservable<string> Messages => Observable
.FromEventPattern<MessageReceivedEventArgs>(h => _webSocket.MessageReceived += h,
h => _webSocket.MessageReceived -= h)
.Select(e => e.EventArgs.Message);
...
public IObservable<WebSocketPriceTicker24Hr> Tickers => ...;
public IObservable<WebSocketDepth> Depth => ...;
_eventSubscription = _webSocket.Messages
.Select(m => // string
Console.WriteLine($"Message: m");
// TODO: What here?
//JsonSerializer.Deserialize<WebSocketResponse<WebSocketPriceTicker24Hr>>(m) ?? throw new ArgumentException(m, nameof(m));
//JsonSerializer.Deserialize<WebSocketResponse<WebSocketDepth>>(m) ?? throw new ArgumentException(m, nameof(m));
return m;
)
.Subscribe((result) => // IObservable<string>
// TODO: What here?
);
...
// Models
public class WebSocketResponse<T>
public string? Stream get; set;
public T? Data get; set;
public class WebSocketPriceTicker24Hr
[JsonPropertyName("e")] public string? EventType get; set;
[JsonPropertyName("E")] public long EventTime get; set;
[JsonPropertyName("s")] public string? Symbol get; set;
[JsonPropertyName("p")] public decimal PriceChange get; set;
[JsonPropertyName("P")] public decimal PriceChangePercent get; set;
[JsonPropertyName("w")] public decimal WeightedAveragePrice get; set;
[JsonPropertyName("x")] public decimal PreviousClosePrice get; set;
[JsonPropertyName("c")] public decimal LastPrice get; set;
[JsonPropertyName("Q")] public decimal LastQuantity get; set;
[JsonPropertyName("b")] public decimal BestBidPrice get; set;
[JsonPropertyName("B")] public decimal BestBidQuantity get; set;
[JsonPropertyName("a")] public decimal BestAskPrice get; set;
[JsonPropertyName("A")] public decimal BestAskQuantity get; set;
[JsonPropertyName("o")] public decimal OpenPrice get; set;
[JsonPropertyName("h")] public decimal HighPrice get; set;
[JsonPropertyName("l")] public decimal LowPrice get; set;
[JsonPropertyName("v")] public decimal TotalTradedBaseVolume get; set;
[JsonPropertyName("q")] public decimal TotalTradedQuoteVolume get; set;
[JsonPropertyName("O")] public long OpenTime get; set;
[JsonPropertyName("C")] public long CloseTime get; set;
[JsonPropertyName("F")] public long FirstTradeId get; set;
[JsonPropertyName("L")] public long LastTradeId get; set;
[JsonPropertyName("n")] public long Count get; set;
public class WebSocketDepth
[JsonPropertyName("e")] public string? EventType get; set;
[JsonPropertyName("E")] public long EventTime get; set;
[JsonPropertyName("s")] public string? Symbol get; set;
[JsonPropertyName("U")] public long FirstUpdateId get; set;
[JsonPropertyName("u")] public long FinalUpdateId get; set;
[JsonPropertyName("b")]
public IEnumerable<IEnumerable<string>> Bids get; set; = Array.Empty<IEnumerable<string>>();
[JsonPropertyName("a")]
public IEnumerable<IEnumerable<string>> Asks get; set; = Array.Empty<IEnumerable<string>>();
【问题讨论】:
您到底有什么问题?您可以使用您的 cmets 中已经提到的JsonSerializer.Deserialize()
。您可以使用OfType<>()
过滤特定类型的对象。
@Progman 这就是问题所在。我该怎么做?
【参考方案1】:
您可以构建一个 IObservable<object>
类型的 observable,其中包含 WebSocketPriceTicker24Hr
和 WebSocketDepth
对象。之后,您使用OfType<T>()
构建特定类型的可观察对象。
IObservable<object> afterDeserialize = source.Select<string, object>(it =>
var ticker = JsonSerializer.Deserialize<WebSocketResponse<WebSocketPriceTicker24Hr>>(it);
var depth = JsonSerializer.Deserialize<WebSocketResponse<WebSocketDepth>>(it);
if (ticker != null && ticker.stream == "bnbusdt@ticker")
return ticker.data;
if (depth != null && depth.stream == "dogeusdt@depth5")
return depth.data;
throw new InvalidOperationException("Could not deserialize the JSON to any object");
);
这是反序列化/提取 JSON 的“数据”部分的一种可能方法,但这些硬编码的 stream
检查很难看。当格式不匹配时,JsonSerializer.Deserialize()
调用将不会返回 null
。您必须将反序列化过程调整为更通用/更健壮。但对于概念验证,这将返回 WebSocketPriceTicker24Hr
和 WebSocketDepth
对象。
现在我们可以在这个 observable 上使用OfType<T>()
。
IObservable<WebSocketDepth> onlyDepths = afterDeserialize
.OfType<WebSocketDepth>();
IObservable<WebSocketPriceTicker24Hr> onlyTicker = afterDeserialize
.OfType<WebSocketPriceTicker24Hr>();
您可以从那里订阅onlyDepths
和onlyTicker
observables。
【讨论】:
谢谢!所以它更有可能github.com/JuliaZaborowska/TPUM/blob/…以上是关于Reactive.NET - 从 .Subscribe 返回通用对象的主要内容,如果未能解决你的问题,请参考以下文章
从 Angular 向 Asp.net 发送 post 请求时出现错误 405