System.Net.WebSockets.ClientWebSocket 不能可靠地工作?
Posted
技术标签:
【中文标题】System.Net.WebSockets.ClientWebSocket 不能可靠地工作?【英文标题】:System.Net.WebSockets.ClientWebSocket no reliably working? 【发布时间】:2015-09-23 20:18:06 【问题描述】:我正在根据这里的 url 实现从我们的 api 传回一个 web 套接字;
http://blogs.msdn.com/b/youssefm/archive/2012/07/17/building-real-time-web-apps-with-asp-net-webapi-and-websockets.aspx
现在的想法是用户将注册一个网络套接字。这是使用以下代码完成并工作的;
[HttpGet]
[Route("getsocket")]
public HttpResponseMessage GetWebSocket()
HttpContext.Current.AcceptWebSocketRequest(new TestWebSocketHandler());
return Request.CreateResponse(HttpStatusCode.SwitchingProtocols);
然后他们调用 api 开始执行一些特定的功能,然后将消息报告回同一个 websocket。
private static WebSocketWrapper _socket;
[HttpGet]
[Route("begin")]
public async Task<IHttpActionResult> StartRunning(string itemid)
try
if (_socket == null ||
_socket.State() == WebSocketState.Aborted ||
_socket.State() == WebSocketState.Closed ||
_socket.State() == WebSocketState.None)
_socket = WebSocketWrapper.Create("wss://localhost:44301/api/v1/testcontroller/getsocket");
_socket.OnConnect(OnConnect);
_socket.OnDisconnect(OnDisconnect);
_socket.OnMessage(OnMessage);
await _socket.ConnectAsync();
//builds first message to be sent and sends it
_socket.QueueMessage(firstTest);
catch(Exception ex)
//logs error
return Ok();
因此,客户端有效地创建了一个连接到服务器的新 websocket。然后他们调用第二条消息来触发服务器在通过的设备上启动一些测试。服务器启动测试并将消息广播回套接字(json消息模型包含设备ID,因此客户端可以过滤相关消息)。
当他们收到一条消息时,客户端将确认它并完成下一个测试等。
现在它在我第一次运行(编译后)时工作。但是,我希望能够让多个客户端连接到 websocket 列表(解决方案是表格的,它将运行的测试可能需要一段时间,因此它可能会在任何时候运行多个测试)。所以我认为这与静态 WebSocketWrapper 实例有关。
但是,他们要求在服务器上使用单个 websocket,并列出正在侦听的设备。因此,实际上所有消息都从一个服务器连接发送到所有客户端。客户端然后根据他们传递的 deviceid 过滤掉他们想要收听的消息。
当我尝试重新运行时,运行第二个测试,基本上是调用 getwebsocket 然后是 begin 方法,代码运行没有错误,但 onopen 方法永远不会被调用?就好像插座没有启动一样?
很遗憾,我们不能使用 signalr,因为没有指定
作为参考,套接字包装类是
公共类 WebSocketWrapper 私有常量 int ReceiveChunkSize = 2048; 私有常量 int SendChunkSize = 2048;
private readonly ClientWebSocket _ws;
private readonly Uri _uri;
private readonly CancellationTokenSource _cancellationTokenSource = new CancellationTokenSource();
private readonly CancellationToken _cancellationToken;
private Action<WebSocketWrapper> _onConnected;
private Action<TestResultModel, WebSocketWrapper> _onMessage;
private Action<WebSocketWrapper> _onDisconnected;
//private static Queue<TestResultModel> _messageQueue = new Queue<TestResultModel>();
private static BlockingCollection<TestResultModel> _messageQueue = new BlockingCollection<TestResultModel>();
protected WebSocketWrapper(string uri)
_ws = new ClientWebSocket();
_ws.Options.KeepAliveInterval = TimeSpan.FromSeconds(30);
_uri = new Uri(uri);
_cancellationToken = _cancellationTokenSource.Token;
/// <summary>
/// Creates a new instance.
/// </summary>
/// <param name="uri">The URI of the WebSocket server.</param>
/// <returns></returns>
public static WebSocketWrapper Create(string uri)
return new WebSocketWrapper(uri);
/// <summary>
/// Get the current state of the socket
/// </summary>
/// <returns>WebSocketState of the current _ws</returns>
public WebSocketState State()
return _ws.State;
/// <summary>
/// Disconnects from the websocket
/// </summary>
public async Task DisconnectAsync()
try
await _ws.CloseOutputAsync(
WebSocketCloseStatus.NormalClosure,
"Server has been closed by the disconnect method",
_cancellationToken);
CallOnDisconnected();
catch(Exception ex)
throw ex;
finally
_ws.Dispose();
/// <summary>
/// Connects to the WebSocket server.
/// </summary>
/// <returns></returns>
public async Task<WebSocketWrapper> ConnectAsync()
try
await _ws.ConnectAsync(_uri, _cancellationToken);
catch(Exception ex)
CallOnConnected();
RunInTask(() => ProcessQueueAsync());
RunInTask(() => StartListen());
return this;
/// <summary>
/// Set the Action to call when the connection has been established.
/// </summary>
/// <param name="onConnect">The Action to call.</param>
/// <returns></returns>
public WebSocketWrapper OnConnect(Action<WebSocketWrapper> onConnect)
_onConnected = onConnect;
return this;
/// <summary>
/// Set the Action to call when the connection has been terminated.
/// </summary>
/// <param name="onDisconnect">The Action to call</param>
/// <returns></returns>
public WebSocketWrapper OnDisconnect(Action<WebSocketWrapper> onDisconnect)
_onDisconnected = onDisconnect;
return this;
/// <summary>
/// Adds a message to the queu for sending
/// </summary>
/// <param name="message"></param>
public void QueueMessage(TestResultModel message)
//_messageQueue.Enqueue(message);
_messageQueue.Add(message);
/// <summary>
/// returns the size of the current message queue.
/// Usefult for detemning whether or not an messages are left in queue before closing
/// </summary>
/// <returns></returns>
public int QueueCount()
return _messageQueue.Count;
/// <summary>
/// Processes the message queue in order
/// </summary>
public async Task ProcessQueueAsync()
try
foreach(var current in _messageQueue.GetConsumingEnumerable())
await SendMessageAsync(current);
catch(Exception ex)
//TODO
/// <summary>
/// Set the Action to call when a messages has been received.
/// </summary>
/// <param name="onMessage">The Action to call.</param>
/// <returns></returns>
public WebSocketWrapper OnMessage(Action<TestResultModel, WebSocketWrapper> onMessage)
_onMessage = onMessage;
return this;
/// <summary>
/// Send a message to the WebSocket server.
/// </summary>
/// <param name="message">The message to send</param>
public async Task SendMessageAsync(TestResultModel result)
if (_ws.State != WebSocketState.Open)
throw new Exception("Connection is not open.");
var message = JsonConvert.SerializeObject(result);
var messageBuffer = Encoding.UTF8.GetBytes(message);
var messagesCount = (int)Math.Ceiling((double)messageBuffer.Length / SendChunkSize);
for (var i = 0; i < messagesCount; i++)
var offset = (SendChunkSize * i);
var count = SendChunkSize;
var lastMessage = ((i + 1) == messagesCount);
if ((count * (i + 1)) > messageBuffer.Length)
count = messageBuffer.Length - offset;
await _ws.SendAsync(new ArraySegment<byte>(messageBuffer, offset, count), WebSocketMessageType.Text, lastMessage, _cancellationToken);
private async Task StartListen()
var buffer = new byte[ReceiveChunkSize];
//part of a big hack, temporary solution
string prevResult = "";
try
while (_ws.State == WebSocketState.Open)
var stringResult = new StringBuilder();
WebSocketReceiveResult result;
do
result = await _ws.ReceiveAsync(new ArraySegment<byte>(buffer), _cancellationToken);
if (result.MessageType == WebSocketMessageType.Close)
await _ws.CloseAsync(WebSocketCloseStatus.NormalClosure, string.Empty, CancellationToken.None);
CallOnDisconnected();
else
var str = Encoding.UTF8.GetString(buffer, 0, result.Count);
stringResult.Append(str);
while (!result.EndOfMessage);
if (!prevResult.Equals(stringResult.ToString()))
prevResult = stringResult.ToString();
CallOnMessage(stringResult);
catch (Exception ex)
CallOnDisconnected();
finally
_ws.Dispose();
private void CallOnMessage(StringBuilder stringResult)
if (_onMessage != null)
try
var message = JsonConvert.DeserializeObject<TestResultModel>(stringResult.ToString());
RunInTask(() => _onMessage(message, this));
catch (Exception ex)
//Ignore any other messages not TestResultModel
private void CallOnDisconnected()
if (_onDisconnected != null)
RunInTask(() => _onDisconnected(this));
private void CallOnConnected()
if (_onConnected != null)
RunInTask(() => _onConnected(this));
private static Task RunInTask(Action action)
return Task.Factory.StartNew(action);
作为更新,请查看第二次尝试调用 websocket 时的调试屏幕。如您所见,它似乎处于中止状态? (在第一次运行时显然是空的)。有什么想法吗?
【问题讨论】:
【参考方案1】:您正在寻找的许多功能都可以使用 SignalR“轻松”实现。就像您创建一个“集线器”一样,您可以在其中向所有客户端广播或仅选择一个客户端。
我认为您以前有使用 websockets 的经验,所以我认为您应该看看!这有点神奇,在更大的应用程序和服务中进行故障排除可能会很麻烦。
一个简单的聊天教程可以找到here 您还可以查看 here 以了解使用 SignalR 向单个客户端发送通知时遇到问题的人
【讨论】:
不幸的是,正如我提到的,未指定信号器。我相信这个问题是由于静态套接字包装器实例造成的。以上是关于System.Net.WebSockets.ClientWebSocket 不能可靠地工作?的主要内容,如果未能解决你的问题,请参考以下文章