Web Sockets 在快速发送数据时行为不端
Posted
技术标签:
【中文标题】Web Sockets 在快速发送数据时行为不端【英文标题】:Web Sockets misbehave when sending data very fast 【发布时间】:2017-03-23 07:45:25 【问题描述】:我是 WebSockets 的新手,所以如果是我的错,我很抱歉,但是当我在 while 循环(只是为了测试)中将数据从 WebSocket(在 chrome 中)发送到服务器(C#、TCPListener)时,数据确实没有正确到达服务器。它的第一个字节通常消失了,有时密钥也会在两条消息之间混合。这导致我的服务器崩溃。当数据不规则地发送或一个接一个地发送速度不快时,这种情况永远不会发生。
这是服务器接收器代码:
#region Receiver
public class Receiver
private const int DEFAULT_DATA_LIMIT = 256; // Bytes
private const int HEADER_LENGTH = 2;
private const int KEYS_LENGTH = 4;
private const int SHORT_BYTES = 2;
private const int LONG_BYTES = 8;
private const int SHORT_DATA = 126;
private const int LONG_DATA = 127;
private readonly Socket TargetSocket;
private readonly object DataLock = new object();
private readonly object amountOfDataReceivedLock = new object();
private readonly object RecevingLock = new object();
private readonly object CompletingRecieveLock = new object();
private bool receiving;
private int _amoutOfDataReceived;
private byte[] Data;
private int amountOfDataReceived
get
lock(amountOfDataReceivedLock)
return _amoutOfDataReceived;
set
lock(amountOfDataReceivedLock)
_amoutOfDataReceived = value;
public int Capacity
get
return Data.Length;
public int FreeSpace
get
return Capacity - amountOfDataReceived;
public bool ReceivingData
get
return receiving;
public bool ChunkReady
get
int wholeChunkLength = WholeChunkLength();
if(wholeChunkLength == -1)
return false;
return WholeChunkLength() <= amountOfDataReceived;
public Receiver(Socket socket, int dataLimit)
this.TargetSocket = socket;
Data = new byte[dataLimit];
public Receiver(Socket socket) : this(socket, DEFAULT_DATA_LIMIT)
private int ChunkDataLength()
lock(DataLock)
lock(amountOfDataReceivedLock)
if(amountOfDataReceived < 1)
return -1;
int lengthInfoIndex = HEADER_LENGTH - 1;
if(amountOfDataReceived < lengthInfoIndex + 1)
return -1;
int rawLength = Data[lengthInfoIndex] -128;
int bytesRequiredToGetLength = 0;
if(rawLength == SHORT_DATA)
bytesRequiredToGetLength = SHORT_BYTES;
else if(rawLength == LONG_DATA)
bytesRequiredToGetLength = LONG_BYTES;
else
return rawLength;
if(amountOfDataReceived < lengthInfoIndex + 1 + bytesRequiredToGetLength)
return -1;
return Utilities.ToInt32(Data, lengthInfoIndex + 1, bytesRequiredToGetLength);
private int WholeChunkLength()
int dataLength = ChunkDataLength();
int lengthInfoLength = dataLength < SHORT_DATA ? 0 : dataLength < short.MaxValue ? SHORT_BYTES : LONG_BYTES;
if(dataLength == -1)
return -1;
return HEADER_LENGTH + lengthInfoLength + KEYS_LENGTH + dataLength;
private void ReceiveDataInternal(int dataToReceiveLength)
if(dataToReceiveLength == 0)
return;
lock(RecevingLock)
if(receiving)
return;
receiving = true;
if(dataToReceiveLength > FreeSpace)
dataToReceiveLength = FreeSpace;
TargetSocket.BeginReceive(Data, amountOfDataReceived, dataToReceiveLength, SocketFlags.None, result =>
OnRecevingComplete(result, dataToReceiveLength);
, null);
private void OnRecevingComplete(System.IAsyncResult result, int receivedDataLength)
lock(CompletingRecieveLock) // This is not needed really
TargetSocket.EndReceive(result);
this.amountOfDataReceived += receivedDataLength;
receiving = false;
public void ReceiveData()
ReceiveDataInternal(TargetSocket.Available);
public byte[] GetChunk()
lock(DataLock)
lock(amountOfDataReceivedLock)
int chunkLength = WholeChunkLength();
if(chunkLength == -1 || chunkLength > amountOfDataReceived)
return null;
// throw new System.InvalidOperationException("Chunk is yet not ready!");
byte[] chunk = new byte[chunkLength];
for(int i = 0; i < chunkLength; i++)
chunk[i] = Data[i];
ArrayUtilities<byte>.ShiftArrayLeft(Data, chunkLength, amountOfDataReceived);
amountOfDataReceived -= chunkLength;
return chunk;
#endregion
这里是调用Receiver类的函数
private void Update()
if(!running)
return;
for(int i = 0; i < clients.TotalClients; i++)
if(!clients[i].IsReady)
continue;
var recievier = clients[i].GetReciever();
if(recievier.recievier.FreeSpace > 0 && clients[i].DataAvaliable && !recievier.Receiving)
recievier.ReceiveData();
if(recievier.ChunkReady)
var data = recievier.GetChunk();
Utilities.WSFormatter.DecodeMessage(data);
int SI = Utilities.WSFormatter.MessageStartIndex(data);
System.Console.WriteLine(System.Text.Encoding.UTF8.GetString(data, SI, data.Length - SI));
函数本身被这样的东西调用(在一个新线程上,而不是默认线程上):
while (true)
Update();
System.Threading.Thread.Sleep(1);
这里是 WebSocket 代码:
Engine.Loader.loadEngine(function()
var client = new Engine.Client("ws:192.168.1.105:8080");
var connected = false;
client.addConnectListener(function()
connected = true;
console.log("Connection successful!");
);
client.addRecieveListener(function(data)
console.log(data);
);
var gl = new Engine.GameLoop(new Engine.Renderer(), new Engine.Input(), client);
gl.start();
var i = 0;
gl.addEventListener("UPDATE", function()
/*The code that works*/
if(connected)
client.sendString("" + i++);
/*The code that causes problems*/
var j = 10;
while(connected && j-- > 0)
client.sendString("" + i++);
);
);
编辑:UPDATE 事件由 window.requestAnimationFrame 调用
编辑:我之前发布的答案也失败了。事实证明,它仅在数据 [] 在错误发生之前被填充时才有效。因此,如果我增加 data[] 的容量,我还需要增加接收数据之间的时间。
编辑:让整个工作正常。其中有不止一个问题。事实证明,数据正在被内部的 Socket.BeginReceive() 方法和我同时调用的 DataReceiver.GetChunk() 所修改。 所以现在我首先在临时缓冲区中接收数据,然后在触发完成事件的同时将其写入主缓冲区,同时锁定它,这样其他线程就不会弄乱它(据我所知,没有调用完整方法调用 BeginRecieve 函数的线程)。
正如@vtortola 所说,当触发完整事件时,所有数据可能不在缓冲区中,所以我也考虑过。
这是实际工作的脚本。它是重写的,但有些方法是从旧脚本中复制的(是的,我很懒):
public class DataReceiver
private const int DEFAULT_DIRECT_BUFFER_LIMIT = 256;
private const int DEFAULT_DATA_LIMIT = 256; // bytes
private const int HEADER_LENGTH = 2;
private const int KEYS_LENGTH = 4;
private const int SHORT_DATA = 126;
private const int SHORT_BYTES = 2;
private const int LONG_DATA = 127;
private const int LONG_BYTES = 8;
private readonly Socket TargetSocket;
private readonly object dataUpdatingLock = new object();
private bool receivingData;
private byte[] directRecieveBuffer;
private int dataReceived;
private byte[] data;
public int Capacity
get
return data.Length;
public int AmountOfDataReceived
get
return dataReceived;
public int FreeSpace
get
return Capacity - AmountOfDataReceived;
public bool ReceivingData
get
return receivingData;
public bool ChunkReady
get
int chunkLength = WholeChunkLength();
if(chunkLength < 1)
return false;
return chunkLength <= dataReceived;
private DataReceiver(Socket socket, int bufferLength, int directBufferLength)
this.TargetSocket = socket;
this.data = new byte[bufferLength];
this.directRecieveBuffer = new byte[directBufferLength];
public DataReceiver(Socket socket, int bufferLength) : this(socket, bufferLength, DEFAULT_DIRECT_BUFFER_LIMIT)
public DataReceiver(Socket socket) : this(socket, DEFAULT_DATA_LIMIT, DEFAULT_DIRECT_BUFFER_LIMIT)
private void ReceiveDataInternally()
receivingData = true;
int expectedDataLength = TargetSocket.Available;
if(expectedDataLength > FreeSpace)
expectedDataLength = FreeSpace;
if(expectedDataLength > directRecieveBuffer.Length)
expectedDataLength = directRecieveBuffer.Length;
TargetSocket.BeginReceive(directRecieveBuffer, 0, expectedDataLength, SocketFlags.None, result =>
int receivedDataLength = TargetSocket.EndReceive(result);
lock(dataUpdatingLock)
for(int i = 0; i < receivedDataLength; i++)
data[dataReceived++] = directRecieveBuffer[i];
directRecieveBuffer[i] = 0;
receivingData = false;
, null);
public byte[] GetChunk()
int chunkLength = WholeChunkLength();
if(chunkLength == -1 || chunkLength > dataReceived)
return null;
byte[] chunk = new byte[chunkLength];
for(int i = 0; i < chunkLength; i++)
chunk[i] = data[i];
lock(dataUpdatingLock)
ArrayUtilities<byte>.ShiftArrayLeft(data, chunkLength, dataReceived);
dataReceived -= chunkLength;
return chunk;
private int ChunkDataLength()
if(dataReceived < 1)
return -1;
int lengthInfoIndex = HEADER_LENGTH - 1;
if(dataReceived < HEADER_LENGTH)
return -1;
int rawLength = data[lengthInfoIndex] & 127;
int bytesRequiredToGetLength = 0;
if(rawLength == SHORT_DATA)
bytesRequiredToGetLength = SHORT_BYTES;
else if(rawLength == LONG_DATA)
bytesRequiredToGetLength = LONG_BYTES;
else
return rawLength;
if(dataReceived < HEADER_LENGTH + bytesRequiredToGetLength)
return -1;
return Utilities.ToInt32(data, lengthInfoIndex + 1, bytesRequiredToGetLength);
private int WholeChunkLength()
int dataLength = ChunkDataLength();
int lengthInfoLength = dataLength < SHORT_DATA ? 0 : dataLength < short.MaxValue ? SHORT_BYTES : LONG_BYTES;
if(dataLength == -1)
return -1;
return HEADER_LENGTH + lengthInfoLength + KEYS_LENGTH + dataLength;
public void StartReceving()
if(!receivingData)
ReceiveDataInternally();
【问题讨论】:
你知道TCP是一个流吗?因此,可以将多条消息作为一条消息接收。如果您在短时间内发送多条消息,就会发生这种情况。它们被合并。此外,一条消息可能会拆分为多个接收。client.sendString("" + i++);
长度是多少?你怎么知道下一条消息从哪里开始?
Web 套接字具有特定的格式,每个新消息以 129 开头,后跟实际消息的长度,然后是键,然后是实际消息。我通过以下头长度字节获得长度(1字节(用于文本帧),1字节(实际数据的长度信息),4字节键,然后将它们添加到缓冲区的第二个字节获得的长度) .我知道 TCP 可以接收许多消息,并让我的接收者处理它,但它仍然没有解释为什么文本框架是最好的以及为什么键是无效的。
我知道新消息何时开始由上一条消息的长度。
你能不能把你用来调用Receiver
类的代码也放上去?这么多的锁表明你在做一些鲁莽的事情,比如从多个线程中读取。
@vtortola 我(目前)仅通过一个线程调用 Receiver 类。但我肯定打算在项目的后期使用多个线程(我仍然很容易使用线程,但这是一个刚刚学习的项目:)).....更新了问题中的代码!
【参考方案1】:
当您调用 recievier.ReceiveData();
时,您假设在该方法完成时已读取数据,但这是不对的。对该方法的调用仅启动读取,该读取将在调用OnRecevingComplete
的匿名委托上异步结束。当应用程序负载比简单调用更多时,这将更加明显。如果您使用TcpListener
,并使用async/await
而不是Socket
以及BeginXXX
和EndXXX
方法,会更容易。
您似乎还假设一旦完成读取,您的所有数据都在缓冲区中,但可能不是。例如,要读取 6 个字节,您可能需要读取 6 次,或者 3 次,或者可能只读取 1 次。您需要在某处使用 while
循环,该循环不断将数据读取到缓冲区,直到您完成所有预期的有效负载。
在此处查看示例:How can I read from a socket repeatedly?
public void ContinuousReceive()
byte[] buffer = new byte[1024];
bool terminationCodeReceived = false;
while(!terminationCodeReceived)
try
if(server.Receive(buffer)>0)
// We got something
// Parse the received data and check if the termination code
// is received or not
catch (SocketException e)
Console.WriteLine("Oops! Something bad happened:" + e.Message);
附注:
return Utilities.ToInt32(Data, lengthInfoIndex + 1, bytesRequiredToGetLength);
Int32
有 4 个字节,所以如果你将 8 个字节解析为Int32
,它可能会溢出。另外,请记住长度为ushort
、uint
和ulong
(即:无符号整数),具体取决于标题是否指示
您无法使用多个线程读取,也无法使用套接字上的多个线程进行写入。您可以同时从独立的读取和写入,但不能同时执行相同的操作。这将导致不可预测的错误。你不应该有任何锁并且认为这个类不是线程安全的(例如它是NetworkStream
)。多线程不会在网络编程中带来更好的性能,只是不要。相反,请专注于使您的代码异步。
Good old book that I loved.
【讨论】:
谢谢@vtortola!我的 Utilities.Int32 方法处理的字节数超过了所需的字节数。我只是懒得提供 4 个字节作为参数,但一切正常,没有溢出,它给出了正确的结果:)。你说 italic 当你调用 recievier.ReceiveData(); ,您假设在该方法完成时已读取数据,但这是不对的。 italic 但我检查了接收器是否已读取数据。 ChunkReady 属性。但是问题似乎是因为您在第二段中所说的。我会按照你说的重写课程! 好的,它工作了@vtortola 感谢您的帮助。问题很多,不止一个。我已经用工作脚本更新了问题!正如你所说,我已经移除了锁,并且只会在一个线程中使用它。以上是关于Web Sockets 在快速发送数据时行为不端的主要内容,如果未能解决你的问题,请参考以下文章
iOS 中的 MIDI 合成行为不端 WRT 弯音:忽略 LSB
带有大标题的 UISearchController,UI 在关闭时行为不端