处理 WebSocket 的类 close 和 dispose (Observable.Using)
Posted
技术标签:
【中文标题】处理 WebSocket 的类 close 和 dispose (Observable.Using)【英文标题】:Handling WebSocket's class close and dispose (Observable.Using) 【发布时间】:2022-01-06 02:29:05 【问题描述】:如何处理WebSocket
的关闭和处置? simple.SubscribeToTicker().Subscribe(...)
也需要处理。
我看到some people 这样做如下,但我不确定Observable.Using(...)
的行为以及何时进行处置。
var message =
Observable.Using(() => ws,
_ => Observable.FromEventPattern<MessageReceivedEventArgs>(h => ws.MessageReceived += h,
h => ws.MessageReceived -= h));
片段
using System.Net;
using System.Reactive.Linq;
using System.Text.Json;
using System.Text.Json.Serialization;
using RestSharp;
using WebSocket4Net;
namespace SimpleTest;
public class Simple
public RestPriceTicker24Hr? GetTicker()
const string uri = "https://api.binance.com";
var client = new RestClient(uri);
var request = new RestRequest("/api/v3/ticker/24hr?symbol=BNBUSDT", Method.GET);
var response = client.Execute(request);
if (response.StatusCode != HttpStatusCode.OK)
throw new InvalidOperationException();
var content = response.Content;
var deserialize = JsonSerializer.Deserialize<RestPriceTicker24Hr>(content, new JsonSerializerOptions
NumberHandling = JsonNumberHandling.AllowReadingFromString
);
return deserialize;
public IObservable<PriceTicker24Hr?> SubscribeToTicker()
const string uri = "wss://stream.binance.com:9443/ws";
var ws = new WebSocket($"uri/bnbusdt@ticker") // TODO: Dispose
AutoSendPingInterval = 3, // 3 seconds
EnableAutoSendPing = true
;
ws.Open();
ws.Error += (_, e) => Console.WriteLine($"Exception: e.Exception.Message"); ;
ws.Opened += (_, _) => Console.WriteLine("Connection opened"); ;
ws.Closed += (_, _) => Console.WriteLine("Connection closed"); ;
//var message =
// Observable.Using(() => ws,
// _ => Observable.FromEventPattern<MessageReceivedEventArgs>(h => ws.MessageReceived += h,
// h => ws.MessageReceived -= h));
var message =
Observable.FromEventPattern<MessageReceivedEventArgs>(h => ws.MessageReceived += h,
h => ws.MessageReceived -= h);
return message.Select(e =>
JsonSerializer.Deserialize<PriceTicker24Hr>(e.EventArgs.Message, new JsonSerializerOptions
NumberHandling = JsonNumberHandling.AllowReadingFromString
));
public class Program
private static void GetData()
var simple = new Simple();
var ticker = simple.GetTicker();
simple.SubscribeToTicker().Subscribe(message => Console.WriteLine($"Message: message?.BestAskPrice"); ); // TODO: Dispose
private static void Main()
GetData();
Console.ReadKey();
public class PriceTicker24Hr
[JsonPropertyName("e")] public string? EventType get; set;
[JsonPropertyName("E")] public long EventTime get; set;
[JsonPropertyName("s")] public string? Symbol get; set;
[JsonPropertyName("p")] public decimal PriceChange get; set;
[JsonPropertyName("P")] public decimal PriceChangePercent get; set;
[JsonPropertyName("w")] public decimal WeightedAveragePrice get; set;
[JsonPropertyName("x")] public decimal PreviousClosePrice get; set;
[JsonPropertyName("c")] public decimal LastPrice get; set;
[JsonPropertyName("Q")] public decimal LastQuantity get; set;
[JsonPropertyName("b")] public decimal BestBidPrice get; set;
[JsonPropertyName("B")] public decimal BestBidQuantity get; set;
[JsonPropertyName("a")] public decimal BestAskPrice get; set;
[JsonPropertyName("A")] public decimal BestAskQuantity get; set;
[JsonPropertyName("o")] public decimal OpenPrice get; set;
[JsonPropertyName("h")] public decimal HighPrice get; set;
[JsonPropertyName("l")] public decimal LowPrice get; set;
[JsonPropertyName("v")] public decimal TotalTradedBaseVolume get; set;
[JsonPropertyName("q")] public decimal TotalTradedQuoteVolume get; set;
[JsonPropertyName("O")] public long OpenTime get; set;
[JsonPropertyName("C")] public long CloseTime get; set;
[JsonPropertyName("F")] public long FirstTradeId get; set;
[JsonPropertyName("L")] public long LastTradeId get; set;
[JsonPropertyName("n")] public long Count get; set;
public class RestPriceTicker24Hr
[JsonPropertyName("symbol")] public string? Symbol get; set;
[JsonPropertyName("priceChange")] public decimal PriceChange get; set;
[JsonPropertyName("priceChangePercent")]
public decimal PriceChangePercent get; set;
[JsonPropertyName("weightedAvgPrice")] public decimal WeightedAveragePrice get; set;
[JsonPropertyName("prevClosePrice")] public decimal PreviousClosePrice get; set;
[JsonPropertyName("lastPrice")] public decimal LastPrice get; set;
[JsonPropertyName("lastQty")] public decimal LastQuantity get; set;
[JsonPropertyName("bidPrice")] public decimal BestBidPrice get; set;
[JsonPropertyName("bidQty")] public decimal BestBidQuantity get; set;
[JsonPropertyName("askPrice")] public decimal BestAskPrice get; set;
[JsonPropertyName("askQty")] public decimal BestAskQuantity get; set;
[JsonPropertyName("openPrice")] public decimal OpenPrice get; set;
[JsonPropertyName("highPrice")] public decimal HighPrice get; set;
[JsonPropertyName("lowPrice")] public decimal LowPrice get; set;
[JsonPropertyName("volume")] public decimal TotalTradedBaseVolume get; set;
[JsonPropertyName("quoteVolume")] public decimal TotalTradedQuoteVolume get; set;
[JsonPropertyName("openTime")] public long OpenTime get; set;
[JsonPropertyName("closeTime")] public long CloseTime get; set;
[JsonPropertyName("firstId")] public long FirstTradeId get; set;
[JsonPropertyName("lastId")] public long LastTradeId get; set;
[JsonPropertyName("count")] public long Count get; set;
【问题讨论】:
【参考方案1】:每个都需要在 Observable.Using
内,以便在可观察对象结束时触发处置。
这就是它的样子:
public IObservable<PriceTicker24Hr?> SubscribeToTicker() =>
Observable
.Using(
() =>
const string uri = "wss://stream.binance.com:9443/ws";
var ws = new WebSocket($"uri/bnbusdt@ticker") // TODO: Dispose
AutoSendPingInterval = 3, // 3 seconds
EnableAutoSendPing = true
;
ws.Open();
ws.Error += (_, e) => Console.WriteLine($"Exception: e.Exception.Message"); ;
ws.Opened += (_, _) => Console.WriteLine("Connection opened"); ;
ws.Closed += (_, _) => Console.WriteLine("Connection closed"); ;
return ws;
,
ws =>
Observable
.FromEventPattern<MessageReceivedEventArgs>(
h => ws.MessageReceived += h,
h => ws.MessageReceived -= h)
.Select(e =>
JsonSerializer.Deserialize<PriceTicker24Hr>(
e.EventArgs.Message,
new JsonSerializerOptions
NumberHandling = JsonNumberHandling.AllowReadingFromString
)));
这是一个可测试的示例,可以看到 Using
的实际效果:
IObservable<Unit> observable =
Observable
.Using(
() => Disposable.Create(() => Console.WriteLine("Using Disposed!")),
_ => Observable.Never<Unit>());
IDisposable subscription = observable.Subscribe();
subscription.Dispose();
在控制台上弹出Using Disposed!
。
这是另一个例子:
IObservable<int> observable =
Observable
.Using(
() => Disposable.Create(() => Console.WriteLine("Using Disposed!")),
_ => Observable.Repeat(42))
.Take(1);
IDisposable subscription = observable.Subscribe(Console.WriteLine);
弹出以下内容:
42
Using Disposed!
希望这可以帮助您了解Observable.Using
的工作原理。
【讨论】:
嘿,伙计!谢谢你的描述性答案! :)以上是关于处理 WebSocket 的类 close 和 dispose (Observable.Using)的主要内容,如果未能解决你的问题,请参考以下文章
WebSocket:调用 close() 不会触发 onclose 处理程序?
Tornado websocket 处理程序,self.close() 正在关闭连接而不触发 on_close() 方法
WebSocket Handler on_close 方法 async 实现tornado
Wildfly/WebSocket/Apache : WebSocket 已经处于 CLOSING 或 CLOSED 状态