处理 WebSocket 的类 close 和 dispose (Observable.Using)

Posted

技术标签:

【中文标题】处理 WebSocket 的类 close 和 dispose (Observable.Using)【英文标题】:Handling WebSocket's class close and dispose (Observable.Using) 【发布时间】:2022-01-06 02:29:05 【问题描述】:

如何处理WebSocket 的关闭和处置? simple.SubscribeToTicker().Subscribe(...) 也需要处理。

我看到some people 这样做如下,但我不确定Observable.Using(...) 的行为以及何时进行处置。

var message =
    Observable.Using(() => ws,
        _ => Observable.FromEventPattern<MessageReceivedEventArgs>(h => ws.MessageReceived += h,
            h => ws.MessageReceived -= h));

片段

using System.Net;
using System.Reactive.Linq;
using System.Text.Json;
using System.Text.Json.Serialization;
using RestSharp;
using WebSocket4Net;

namespace SimpleTest;

public class Simple

    public RestPriceTicker24Hr? GetTicker()
    
        const string uri = "https://api.binance.com";

        var client = new RestClient(uri);
        var request = new RestRequest("/api/v3/ticker/24hr?symbol=BNBUSDT", Method.GET);
        var response = client.Execute(request);

        if (response.StatusCode != HttpStatusCode.OK)
            throw new InvalidOperationException();

        var content = response.Content;

        var deserialize = JsonSerializer.Deserialize<RestPriceTicker24Hr>(content, new JsonSerializerOptions
        
            NumberHandling = JsonNumberHandling.AllowReadingFromString
        );

        return deserialize;
    

    public IObservable<PriceTicker24Hr?> SubscribeToTicker()
    
        const string uri = "wss://stream.binance.com:9443/ws";

        var ws = new WebSocket($"uri/bnbusdt@ticker") // TODO: Dispose
        
            AutoSendPingInterval = 3, // 3 seconds
            EnableAutoSendPing = true
        ;

        ws.Open();

        ws.Error += (_, e) =>  Console.WriteLine($"Exception: e.Exception.Message"); ;

        ws.Opened += (_, _) =>  Console.WriteLine("Connection opened"); ;

        ws.Closed += (_, _) =>  Console.WriteLine("Connection closed"); ;

        //var message =
        //    Observable.Using(() => ws,
        //        _ => Observable.FromEventPattern<MessageReceivedEventArgs>(h => ws.MessageReceived += h,
        //            h => ws.MessageReceived -= h));

        var message =
            Observable.FromEventPattern<MessageReceivedEventArgs>(h => ws.MessageReceived += h,
                h => ws.MessageReceived -= h);

        return message.Select(e =>
            JsonSerializer.Deserialize<PriceTicker24Hr>(e.EventArgs.Message, new JsonSerializerOptions
            
                NumberHandling = JsonNumberHandling.AllowReadingFromString
            ));
    


public class Program

    private static void GetData()
    
        var simple = new Simple();

        var ticker = simple.GetTicker();
        simple.SubscribeToTicker().Subscribe(message =>  Console.WriteLine($"Message: message?.BestAskPrice"); ); // TODO: Dispose
    

    private static void Main()
    
        GetData();

        Console.ReadKey();
    


public class PriceTicker24Hr

    [JsonPropertyName("e")] public string? EventType  get; set; 

    [JsonPropertyName("E")] public long EventTime  get; set; 

    [JsonPropertyName("s")] public string? Symbol  get; set; 

    [JsonPropertyName("p")] public decimal PriceChange  get; set; 

    [JsonPropertyName("P")] public decimal PriceChangePercent  get; set; 

    [JsonPropertyName("w")] public decimal WeightedAveragePrice  get; set; 

    [JsonPropertyName("x")] public decimal PreviousClosePrice  get; set; 

    [JsonPropertyName("c")] public decimal LastPrice  get; set; 

    [JsonPropertyName("Q")] public decimal LastQuantity  get; set; 

    [JsonPropertyName("b")] public decimal BestBidPrice  get; set; 

    [JsonPropertyName("B")] public decimal BestBidQuantity  get; set; 

    [JsonPropertyName("a")] public decimal BestAskPrice  get; set; 

    [JsonPropertyName("A")] public decimal BestAskQuantity  get; set; 

    [JsonPropertyName("o")] public decimal OpenPrice  get; set; 

    [JsonPropertyName("h")] public decimal HighPrice  get; set; 

    [JsonPropertyName("l")] public decimal LowPrice  get; set; 

    [JsonPropertyName("v")] public decimal TotalTradedBaseVolume  get; set; 

    [JsonPropertyName("q")] public decimal TotalTradedQuoteVolume  get; set; 

    [JsonPropertyName("O")] public long OpenTime  get; set; 

    [JsonPropertyName("C")] public long CloseTime  get; set; 

    [JsonPropertyName("F")] public long FirstTradeId  get; set; 

    [JsonPropertyName("L")] public long LastTradeId  get; set; 

    [JsonPropertyName("n")] public long Count  get; set; 


public class RestPriceTicker24Hr

    [JsonPropertyName("symbol")] public string? Symbol  get; set; 
    [JsonPropertyName("priceChange")] public decimal PriceChange  get; set; 

    [JsonPropertyName("priceChangePercent")]
    public decimal PriceChangePercent  get; set; 

    [JsonPropertyName("weightedAvgPrice")] public decimal WeightedAveragePrice  get; set; 
    [JsonPropertyName("prevClosePrice")] public decimal PreviousClosePrice  get; set; 
    [JsonPropertyName("lastPrice")] public decimal LastPrice  get; set; 
    [JsonPropertyName("lastQty")] public decimal LastQuantity  get; set; 
    [JsonPropertyName("bidPrice")] public decimal BestBidPrice  get; set; 
    [JsonPropertyName("bidQty")] public decimal BestBidQuantity  get; set; 
    [JsonPropertyName("askPrice")] public decimal BestAskPrice  get; set; 
    [JsonPropertyName("askQty")] public decimal BestAskQuantity  get; set; 
    [JsonPropertyName("openPrice")] public decimal OpenPrice  get; set; 
    [JsonPropertyName("highPrice")] public decimal HighPrice  get; set; 
    [JsonPropertyName("lowPrice")] public decimal LowPrice  get; set; 
    [JsonPropertyName("volume")] public decimal TotalTradedBaseVolume  get; set; 
    [JsonPropertyName("quoteVolume")] public decimal TotalTradedQuoteVolume  get; set; 
    [JsonPropertyName("openTime")] public long OpenTime  get; set; 
    [JsonPropertyName("closeTime")] public long CloseTime  get; set; 
    [JsonPropertyName("firstId")] public long FirstTradeId  get; set; 
    [JsonPropertyName("lastId")] public long LastTradeId  get; set; 
    [JsonPropertyName("count")] public long Count  get; set; 

【问题讨论】:

【参考方案1】:

每个都需要在 Observable.Using 内,以便在可观察对象结束时触发处置。

这就是它的样子:

public IObservable<PriceTicker24Hr?> SubscribeToTicker() =>
    Observable
        .Using(
            () =>
            
                const string uri = "wss://stream.binance.com:9443/ws";
                var ws = new WebSocket($"uri/bnbusdt@ticker") // TODO: Dispose
                
                    AutoSendPingInterval = 3, // 3 seconds
                    EnableAutoSendPing = true
                ;
                ws.Open();
                ws.Error += (_, e) =>  Console.WriteLine($"Exception: e.Exception.Message"); ;
                ws.Opened += (_, _) =>  Console.WriteLine("Connection opened"); ;
                ws.Closed += (_, _) =>  Console.WriteLine("Connection closed"); ;
                return ws;
            ,
            ws =>
                Observable
                    .FromEventPattern<MessageReceivedEventArgs>(
                        h => ws.MessageReceived += h,
                        h => ws.MessageReceived -= h)
                    .Select(e =>
                        JsonSerializer.Deserialize<PriceTicker24Hr>(
                            e.EventArgs.Message,
                            new JsonSerializerOptions
                            
                                NumberHandling = JsonNumberHandling.AllowReadingFromString
                            )));

这是一个可测试的示例,可以看到 Using 的实际效果:

IObservable<Unit> observable =
    Observable
        .Using(
            () => Disposable.Create(() => Console.WriteLine("Using Disposed!")),
            _ => Observable.Never<Unit>());
            
IDisposable subscription = observable.Subscribe();

subscription.Dispose();

在控制台上弹出Using Disposed!

这是另一个例子:

IObservable<int> observable =
    Observable
        .Using(
            () => Disposable.Create(() => Console.WriteLine("Using Disposed!")),
            _ => Observable.Repeat(42))
        .Take(1);
            
IDisposable subscription = observable.Subscribe(Console.WriteLine);

弹出以下内容:

42
Using Disposed!

希望这可以帮助您了解Observable.Using 的工作原理。

【讨论】:

嘿,伙计!谢谢你的描述性答案! :)

以上是关于处理 WebSocket 的类 close 和 dispose (Observable.Using)的主要内容,如果未能解决你的问题,请参考以下文章

WebSocket:调用 close() 不会触发 onclose 处理程序?

Tornado websocket 处理程序,self.close() 正在关闭连接而不触发 on_close() 方法

WebSocket Handler on_close 方法 async 实现tornado

Websocket 关闭浏览器报错

Wildfly/WebSocket/Apache : WebSocket 已经处于 CLOSING 或 CLOSED 状态

WebSocket 已经处于 CLOSING 或 CLOSED 状态 Socket io