Web Sockets 在快速发送数据时行为不端

Posted

技术标签:

【中文标题】Web Sockets 在快速发送数据时行为不端【英文标题】:Web Sockets misbehave when sending data very fast 【发布时间】:2017-03-23 07:45:25 【问题描述】:

我是 WebSockets 的新手,所以如果是我的错,我很抱歉,但是当我在 while 循环(只是为了测试)中将数据从 WebSocket(在 chrome 中)发送到服务器(C#、TCPListener)时,数据确实没有正确到达服务器。它的第一个字节通常消失了,有时密钥也会在两条消息之间混合。这导致我的服务器崩溃。当数据不规则地发送或一个接一个地发送速度不快时,这种情况永远不会发生。

这是服务器接收器代码:

#region Receiver

    public class Receiver
    
        private const int DEFAULT_DATA_LIMIT = 256; // Bytes

        private const int HEADER_LENGTH = 2;
        private const int KEYS_LENGTH = 4;

        private const int SHORT_BYTES = 2;
        private const int LONG_BYTES = 8;

        private const int SHORT_DATA = 126;
        private const int LONG_DATA = 127;

        private readonly Socket TargetSocket;

        private readonly object DataLock = new object();
        private readonly object amountOfDataReceivedLock = new object();
        private readonly object RecevingLock = new object();
        private readonly object CompletingRecieveLock = new object();

        private bool receiving;

        private int _amoutOfDataReceived;
        private byte[] Data;

        private int amountOfDataReceived
        
            get
            
                lock(amountOfDataReceivedLock)
                    return _amoutOfDataReceived;
            
            set
            
                lock(amountOfDataReceivedLock)
                    _amoutOfDataReceived = value;
            
        

        public int Capacity
        
            get
            
                return Data.Length;
            
        

        public int FreeSpace
        
            get
            
                return Capacity - amountOfDataReceived;
            
        

        public bool ReceivingData
        
            get
            
                return receiving;
            
        

        public bool ChunkReady
        
            get
            
                int wholeChunkLength = WholeChunkLength();

                if(wholeChunkLength == -1)
                    return false;

                return WholeChunkLength() <= amountOfDataReceived;
            
        

        public Receiver(Socket socket, int dataLimit)
        
            this.TargetSocket = socket;
            Data = new byte[dataLimit];
        

        public Receiver(Socket socket) : this(socket, DEFAULT_DATA_LIMIT)  

        private int ChunkDataLength()
        
            lock(DataLock)
            
                lock(amountOfDataReceivedLock)
                
                    if(amountOfDataReceived < 1)
                        return -1;

                    int lengthInfoIndex = HEADER_LENGTH - 1;

                    if(amountOfDataReceived < lengthInfoIndex + 1)
                        return -1;

                    int rawLength = Data[lengthInfoIndex] -128;

                    int bytesRequiredToGetLength = 0;

                    if(rawLength == SHORT_DATA)
                        bytesRequiredToGetLength = SHORT_BYTES;
                    else if(rawLength == LONG_DATA)
                        bytesRequiredToGetLength = LONG_BYTES;
                    else
                        return rawLength;

                    if(amountOfDataReceived < lengthInfoIndex + 1 + bytesRequiredToGetLength)
                        return -1;

                    return Utilities.ToInt32(Data, lengthInfoIndex + 1, bytesRequiredToGetLength);
                
            
        

        private int WholeChunkLength()
        
            int dataLength = ChunkDataLength();
            int lengthInfoLength = dataLength < SHORT_DATA ? 0 : dataLength < short.MaxValue ? SHORT_BYTES : LONG_BYTES;

            if(dataLength == -1)
                return -1;

            return HEADER_LENGTH + lengthInfoLength + KEYS_LENGTH + dataLength;
        

        private void ReceiveDataInternal(int dataToReceiveLength)
        
            if(dataToReceiveLength == 0)
                return;

            lock(RecevingLock)
            
                if(receiving)
                    return;

                receiving = true;
            

            if(dataToReceiveLength > FreeSpace)
                dataToReceiveLength = FreeSpace;

            TargetSocket.BeginReceive(Data, amountOfDataReceived, dataToReceiveLength, SocketFlags.None, result =>
            
                OnRecevingComplete(result, dataToReceiveLength);
            , null);

        

        private void OnRecevingComplete(System.IAsyncResult result, int receivedDataLength)
        
            lock(CompletingRecieveLock) // This is not needed really
            
                TargetSocket.EndReceive(result);
                this.amountOfDataReceived += receivedDataLength;
                receiving = false;
            
        

        public void ReceiveData()
        
            ReceiveDataInternal(TargetSocket.Available);
        

        public byte[] GetChunk()
        
            lock(DataLock)
            
                lock(amountOfDataReceivedLock)
                
                    int chunkLength = WholeChunkLength();

                    if(chunkLength == -1 || chunkLength > amountOfDataReceived)
                        return null;
                     //                            throw new System.InvalidOperationException("Chunk is yet not ready!");

                    byte[] chunk = new byte[chunkLength];

                    for(int i = 0; i < chunkLength; i++)
                        chunk[i] = Data[i];

                    ArrayUtilities<byte>.ShiftArrayLeft(Data, chunkLength, amountOfDataReceived);
                    amountOfDataReceived -= chunkLength;

                    return chunk;
                
            
        

    

    #endregion

这里是调用Receiver类的函数

private void Update()
    
        if(!running)
            return;

        for(int i = 0; i < clients.TotalClients; i++)
        
            if(!clients[i].IsReady)
                continue;

            var recievier = clients[i].GetReciever();

            if(recievier.recievier.FreeSpace > 0 && clients[i].DataAvaliable && !recievier.Receiving)
                recievier.ReceiveData();

            if(recievier.ChunkReady)
            
                var data = recievier.GetChunk();

                Utilities.WSFormatter.DecodeMessage(data);

                int SI = Utilities.WSFormatter.MessageStartIndex(data);

                System.Console.WriteLine(System.Text.Encoding.UTF8.GetString(data, SI, data.Length - SI));
            

        
    

函数本身被这样的东西调用(在一个新线程上,而不是默认线程上):

while (true)

    Update();
    System.Threading.Thread.Sleep(1);

这里是 WebSocket 代码:

        Engine.Loader.loadEngine(function() 

        var client = new Engine.Client("ws:192.168.1.105:8080");
        var connected = false;

        client.addConnectListener(function()
        
            connected = true;
            console.log("Connection successful!");              
        );

        client.addRecieveListener(function(data)
        
            console.log(data);
        );

        var gl = new Engine.GameLoop(new Engine.Renderer(), new Engine.Input(), client);
        gl.start();

        var i = 0;

        gl.addEventListener("UPDATE", function()
        
            /*The code that works*/

            if(connected)
                client.sendString("" + i++);

            /*The code that causes problems*/

            var j = 10;

            while(connected && j-- > 0)
                client.sendString("" + i++);

        );

    );

编辑:UPDATE 事件由 window.requestAnimationFrame 调用

编辑:我之前发布的答案也失败了。事实证明,它仅在数据 [] 在错误发生之前被填充时才有效。因此,如果我增加 data[] 的容量,我还需要增加接收数据之间的时间。

编辑:让整个工作正常。其中有不止一个问题。事实证明,数据正在被内部的 Socket.BeginReceive() 方法和我同时调用的 DataReceiver.GetChunk() 所修改。 所以现在我首先在临时缓冲区中接收数据,然后在触发完成事件的同时将其写入主缓冲区,同时锁定它,这样其他线程就不会弄乱它(据我所知,没有调用完整方法调用 BeginRecieve 函数的线程)。

正如@vtortola 所说,当触发完整事件时,所有数据可能不在缓冲区中,所以我也考虑过。

这是实际工作的脚本。它是重写的,但有些方法是从旧脚本中复制的(是的,我很懒):

public class DataReceiver

private const int DEFAULT_DIRECT_BUFFER_LIMIT = 256;
private const int DEFAULT_DATA_LIMIT = 256; // bytes

private const int HEADER_LENGTH = 2;
private const int KEYS_LENGTH = 4;

private const int SHORT_DATA = 126;
private const int SHORT_BYTES = 2;

private const int LONG_DATA = 127;
private const int LONG_BYTES = 8;

private readonly Socket TargetSocket;

private readonly object dataUpdatingLock = new object();

private bool receivingData;

private byte[] directRecieveBuffer;

private int dataReceived;
private byte[] data;

public int Capacity

    get
    
        return data.Length;
    


public int AmountOfDataReceived

    get
    
        return dataReceived;
    


public int FreeSpace

    get
    
        return Capacity - AmountOfDataReceived;
    


public bool ReceivingData

    get
    
        return receivingData;
    


public bool ChunkReady

    get
    
        int chunkLength = WholeChunkLength();

        if(chunkLength < 1)
            return false;

        return chunkLength <= dataReceived;
    


private DataReceiver(Socket socket, int bufferLength, int directBufferLength)

    this.TargetSocket = socket;
    this.data = new byte[bufferLength];
    this.directRecieveBuffer = new byte[directBufferLength];


public DataReceiver(Socket socket, int bufferLength) : this(socket, bufferLength, DEFAULT_DIRECT_BUFFER_LIMIT)  

public DataReceiver(Socket socket) : this(socket, DEFAULT_DATA_LIMIT, DEFAULT_DIRECT_BUFFER_LIMIT)  

private void ReceiveDataInternally()

    receivingData = true;

    int expectedDataLength = TargetSocket.Available;

    if(expectedDataLength > FreeSpace)
        expectedDataLength = FreeSpace;

    if(expectedDataLength > directRecieveBuffer.Length)
        expectedDataLength = directRecieveBuffer.Length;

    TargetSocket.BeginReceive(directRecieveBuffer, 0, expectedDataLength, SocketFlags.None, result =>
    
        int receivedDataLength = TargetSocket.EndReceive(result);

        lock(dataUpdatingLock)
        
            for(int i = 0; i < receivedDataLength; i++)
            
                data[dataReceived++] = directRecieveBuffer[i];
                directRecieveBuffer[i] = 0;
            
        

        receivingData = false;

    , null);


public byte[] GetChunk()

    int chunkLength = WholeChunkLength();

    if(chunkLength == -1 || chunkLength > dataReceived)
        return null;

    byte[] chunk = new byte[chunkLength];

    for(int i = 0; i < chunkLength; i++)
        chunk[i] = data[i];

    lock(dataUpdatingLock)
    
        ArrayUtilities<byte>.ShiftArrayLeft(data, chunkLength, dataReceived);
        dataReceived -= chunkLength;
    

    return chunk;


private int ChunkDataLength()

    if(dataReceived < 1)
        return -1;

    int lengthInfoIndex = HEADER_LENGTH - 1;

    if(dataReceived < HEADER_LENGTH)
        return -1;

    int rawLength = data[lengthInfoIndex] & 127;

    int bytesRequiredToGetLength = 0;

    if(rawLength == SHORT_DATA)
        bytesRequiredToGetLength = SHORT_BYTES;
    else if(rawLength == LONG_DATA)
        bytesRequiredToGetLength = LONG_BYTES;
    else
        return rawLength;

    if(dataReceived < HEADER_LENGTH + bytesRequiredToGetLength)
        return -1;

    return Utilities.ToInt32(data, lengthInfoIndex + 1, bytesRequiredToGetLength);


private int WholeChunkLength()

    int dataLength = ChunkDataLength();
    int lengthInfoLength = dataLength < SHORT_DATA ? 0 : dataLength < short.MaxValue ? SHORT_BYTES : LONG_BYTES;

    if(dataLength == -1)
        return -1;

    return HEADER_LENGTH + lengthInfoLength + KEYS_LENGTH + dataLength;


public void StartReceving()

    if(!receivingData)
        ReceiveDataInternally();


【问题讨论】:

你知道TCP是一个流吗?因此,可以将多条消息作为一条消息接收。如果您在短时间内发送多条消息,就会发生这种情况。它们被合并。此外,一条消息可能会拆分为多个接收。 client.sendString("" + i++); 长度是多少?你怎么知道下一条消息从哪里开始? Web 套接字具有特定的格式,每个新消息以 129 开头,后跟实际消息的长度,然后是键,然后是实际消息。我通过以下头长度字节获得长度(1字节(用于文本帧),1字节(实际数据的长度信息),4字节键,然后将它们添加到缓冲区的第二个字节获得的长度) .我知道 TCP 可以接收许多消息,并让我的接收者处理它,但它仍然没有解释为什么文本框架是最好的以及为什么键是无效的。 我知道新消息何时开始由上一条消息的长度。 你能不能把你用来调用Receiver类的代码也放上去?这么多的锁表明你在做一些鲁莽的事情,比如从多个线程中读取。 @vtortola 我(目前)仅通过一个线程调用 Receiver 类。但我肯定打算在项目的后期使用多个线程(我仍然很容易使用线程,但这是一个刚刚学习的项目:)).....更新了问题中的代码! 【参考方案1】:

当您调用 recievier.ReceiveData(); 时,您假设在该方法完成时已读取数据,但这是不对的。对该方法的调用仅启动读取,该读取将在调用OnRecevingComplete 的匿名委托上异步结束。当应用程序负载比简单调用更多时,这将更加明显。如果您使用TcpListener,并使用async/await 而不是Socket 以及BeginXXXEndXXX 方法,会更容易。

您似乎还假设一旦完成读取,您的所有数据都在缓冲区中,但可能不是。例如,要读取 6 个字节,您可能需要读取 6 次,或者 3 次,或者可能只读取 1 次。您需要在某处使用 while 循环,该循环不断将数据读取到缓冲区,直到您完成所有预期的有效负载。

在此处查看示例:How can I read from a socket repeatedly?

  public void ContinuousReceive()
    byte[] buffer = new byte[1024];
    bool terminationCodeReceived = false;
    while(!terminationCodeReceived)
      try
          if(server.Receive(buffer)>0)
             // We got something
             // Parse the received data and check if the termination code
             // is received or not
          
      catch (SocketException e)
          Console.WriteLine("Oops! Something bad happened:" + e.Message);
      
    
  

附注:

return Utilities.ToInt32(Data, lengthInfoIndex + 1, bytesRequiredToGetLength);

Int32 有 4 个字节,所以如果你将 8 个字节解析为Int32,它可能会溢出。另外,请记住长度为ushortuintulong(即:无符号整数),具体取决于标题是否指示

您无法使用多个线程读取,也无法使用套接字上的多个线程进行写入。您可以同时从独立的读取和写入,但不能同时执行相同的操作。这将导致不可预测的错误。你不应该有任何锁并且认为这个类不是线程安全的(例如它是NetworkStream)。多线程不会在网络编程中带来更好的性能,只是不要。相反,请专注于使您的代码异步。

Good old book that I loved.

【讨论】:

谢谢@vtortola!我的 Utilities.Int32 方法处理的字节数超过了所需的字节数。我只是懒得提供 4 个字节作为参数,但一切正常,没有溢出,它给出了正确的结果:)。你说 italic 当你调用 recievier.ReceiveData(); ,您假设在该方法完成时已读取数据,但这是不对的。 italic 但我检查了接收器是否已读取数据。 ChunkReady 属性。但是问题似乎是因为您在第二段中所说的。我会按照你说的重写课程! 好的,它工作了@vtortola 感谢您的帮助。问题很多,不止一个。我已经用工作脚本更新了问题!正如你所说,我已经移除了锁,并且只会在一个线程中使用它。

以上是关于Web Sockets 在快速发送数据时行为不端的主要内容,如果未能解决你的问题,请参考以下文章

百科知识 学位论文学术不端行为检测系统简介

iOS 中的 MIDI 合成行为不端 WRT 弯音:忽略 LSB

带有大标题的 UISearchController,UI 在关闭时行为不端

集合视图,具有自定义布局,单元格在滚动时行为不端

使用 Laravel 5.3 和 AngularJS 1.5 的路线行为不端

连接两个具有间隔的数据帧行为不端?