C++ websocket 服务器处理消息碎片

Posted

技术标签:

【中文标题】C++ websocket 服务器处理消息碎片【英文标题】:C++ websocket server handling message fragmentation 【发布时间】:2018-03-11 03:03:31 【问题描述】:

我正在尝试通过 websocket 从一个 javascript/html 客户端向另一个客户端发送图像。问题是服务器错误地接收图像。我将所有图像作为文本中的数据 URI 发送,这样当 javascript 客户端接收到它时,它可以简单地将 img 的 src 设置为 URI。问题(我相信)来自我如何处理消息碎片。发送简单的文本消息工作正常,所以我相信它是导致问题的消息的大小,唯一的主要代码区别是我如何处理消息碎片。从this 文档中,我相信必须做的就是揭开每个分段帧的有效负载并将缓冲区连接在一起。在服务器上读取的 URI 比图像的实际数据 URI 短得多。在客户端,我所做的只是调用 socket.send() 函数。我已经确认我在 javascript FileReader 中读取的数据 URI 是正确的(在客户端)。

    int wSock::readData(/*input socket data buffer*/ char ** sockp, /*output payload*/ char ** buffer, /*output payload info*/ WebSocketFrameData * data) 
    char * sock = *sockp;
    if (!webSocketIsOpened(sock)) return -32; //checks if the socket is open
    u_long package_size;
    SOCKET socket;
    size_t dataRead = 0;
    size_t dr = 0;
    size_t firstLength = 0;
    memcpy_s(&socket, 4, sock, 4);
    ioctlsocket(socket, FIONREAD, &package_size);
    if (package_size <= 0) return 1;
    char * buf = new char[package_size + 1];
    while (dataRead < package_size) 
        dr = recv(socket, buf + dataRead, package_size - dataRead, NULL);
        if (dr == SOCKET_ERROR) 
            delete[] buf; 
            return WSAGetLastError();
        
        dataRead += dr;
    
    *(buf + package_size) = '\0';
    if (package_size > 0) 
        decodeFrame(buf, buffer, &firstLength);
        if (data != NULL) 
            data->payloadLength = firstLength;
            data->opcode = *buf & 0b00001111;
        
    
    else return 1;

    // code handling other opcodes such as a close frame or a ping
    
    char fin = (*buf) >> 7;
    if (!fin)  //start handling message fragmentation
        printf("Fragmentation! \n");
        FD_SET tempRead;
        size_t totalLength = firstLength -1; //firstLength includes the null terminator
        char * combinedPayloads = new char[totalLength];
        memcpy_s(combinedPayloads, totalLength, *buffer, totalLength);
        printf("First frage of size: %u \n", totalLength);
        while (fin != 1) 
            FD_ZERO(&tempRead);
            FD_SET(socket, &tempRead);
            select(0, &tempRead, NULL, NULL, NULL);

            package_size = 0;
            ioctlsocket(socket, FIONREAD, &package_size);
            printf("Reading next frag of size: %u \n", package_size);
            char * contBuf = new char[package_size];
            dataRead = 0;
            while (dataRead < package_size) 
                dr = recv(socket, contBuf + dataRead, package_size - dataRead, NULL);
                if (dr == SOCKET_ERROR)  
                    delete[] contBuf; 
                    return WSAGetLastError();
                
                dataRead += dr;
            
            char * payload;
            size_t payloadLength = 0;
            decodeFrame(contBuf, &payload, &payloadLength);
            payloadLength--; //the output payloadLength from the decodeFrame function includes a null terminator
            char * backBuffer = new char[totalLength];
            memcpy_s(backBuffer, totalLength, combinedPayloads, totalLength);
            delete[] combinedPayloads;

            combinedPayloads = new char[totalLength + payloadLength];
            memcpy_s(combinedPayloads, totalLength, backBuffer, totalLength);
            memcpy_s(combinedPayloads + totalLength, payloadLength, payload, payloadLength);
            fin = contBuf[0] >> 7;
            totalLength += payloadLength;
            delete[] backBuffer;
            delete[] contBuf;
            delete[] payload;
            if (fin) break;
        
        delete[] *buffer;
        *buffer = new char[totalLength + 1];
        memcpy_s(*buffer, totalLength, combinedPayloads, totalLength);
        (*buffer)[totalLength] = '\0';
        delete[] combinedPayloads;
        data->payloadLength = totalLength;
        printf("Finished fragment! Total size: %u \n", totalLength);
    
    delete[] buf;
    return 0;

这是解码每个 websocket 帧的代码。正如我提到的,服务器可以很好地处理较小的聊天消息,所以我认为问题是消息重新组装,但我将包含 decodeFrame 函数,希望它有助于理解。

    int wSock::decodeFrame(char * message, char ** output, size_t * payloadLength)

    char read;
    memcpy_s(&read, 1, message + 1, 1);
    unsigned long long size = read & 0b01111111;
    //takes bits 9 - 15;
    int lastByte = 2;
    if (size == 126) 
        unsigned short rr;
        memcpy_s(&rr, 2, message + 2, 2);
        size = ntohs(rr);
        lastByte = 4;
    
    else if (size == 127) 
        unsigned long long data;
        memcpy_s(&data, 8, message + 2, 8);
        size = ntohll(data);
        lastByte = 10;
    
    if(payloadLength != NULL)
        *payloadLength = size + 1;
    char mask[4];
    memcpy_s(mask, 4, message + lastByte, 4);
    *output = new char[(size + 1)];
    lastByte += 4;
    for (int i = 0; i < size; i++) 
        (*output)[i] = message[lastByte + i] ^ mask.mask[i % 4];
    
    (*output)[size] = '\0';
    return 0;

在服务器端进行调试,我将读取的消息写入文本文件。但是,写入的 URI 只有大约 4,000 - 6,000 个字符长,最后 200 - 400 个字符不是有效的 base64 字符,但是这些无效字符之前的字符 do 匹配它们对应的真实字符数据 URI。重新组装过程中的 printf 语句将倾向于读取大约 262,368 个字节(总计),而实际 URI 的长度为 389,906 个字符。在读取 URI 后,服务器将其发送给客户端,这会导致它们断开连接。因此,正如我所提到的,我的猜测是,当我重新组装数据帧时出现了问题。任何帮助将不胜感激。

【问题讨论】:

【参考方案1】:
ioctlsocket(socket, FIONREAD, &package_size);

FIONREAD 返回可以在没有阻塞的情况下读取的字节数。这意味着在这行代码之后的recv() 循环完全是徒劳的一个 recv() 会读取这么多的数据。不可能。

您也没有正确处理流的结尾(recv() 返回零)。

【讨论】:

谢谢,我不知道 FIONREAD,但我应该怎么做才能正确处理流的结束?循环直到recv() 返回 0?如果是这样,那似乎毫无意义,类似于我已经拥有的。抱歉,如果这似乎是一个愚蠢的问题,我不是很有经验。 直到recv() 返回零,我才说循环。你在这里与自己对话。我只是说你没有正确处理那个案子。你应该循环阅读直到你得到你所期望的,recv()返回零。【参考方案2】:

好的,我想通了。我忘记考虑的是 TCP 消息碎片。正如@EJP 提到的,ioctlsocket 仅返回可以在 one 单个 recv() 调用中读取的字节数。我将接收到的每个数据片段都视为其自己的 WebSocket 帧,但情况并非总是如此。通常(几乎一直)单个recv() 调用只会读取部分帧,并且在第二个recv() 调用时会读取第一帧的下一部分和第二帧的第一部分。然后第二个缓冲区(现在是两个不同的不完整帧的混合)显然不会被正确地去掩码,并且解码的大小会不正确。 javascript 客户端将每个 WebSocket 帧分割成大约 131K 字节,底层 TCP 层会将这些帧进一步分割成大约 65K 字节的数据包。所以我所做的是我在一个 recv() 调用中收到了所有我能收到的数据,然后使用如下函数:

    unsigned long long wSock::decodeTotalFrameSize(char * frame)

    char secondByte = 0;
    memcpy_s(&secondByte, 1, frame + 1, 1);
    unsigned long long size = secondByte & 0b01111111;
    int headerSize = 2 + 4;
    if (size == 126) 
        unsigned short length;
        memcpy_s(&length, 2, frame + 2, 2);
        size = ntohs(length);
        headerSize += 2;
    
    else if (size == 127) 
        unsigned long long length;
        memcpy_s(&length, 8, frame + 2, 8);
        size = ntohll(length);
        headerSize += 8;
    
    return size + headerSize;

获取总的 WebSocket 帧大小。然后循环,直到您将该数量的字节读入单个帧。类似于:

            FD_ZERO(&tempRead);
            FD_SET(socket, &tempRead);
            select(0, &tempRead, NULL, NULL, NULL);

            package_size = 0;
            ioctlsocket(socket, FIONREAD, &package_size);
            char * contBuf = new char[package_size];
            dataRead = 0;
            dr = recv(socket, contBuf, package_size, NULL);
            if (dr == SOCKET_ERROR)  
                delete[] contBuf; 
                return WSAGetLastError();
            
            unsigned long long actualSize = decodeTotalFrameSize(contBuf);
            if (package_size < actualSize) 
                char * backBuffer = new char[package_size];
                memcpy_s(backBuffer, package_size, contBuf, package_size);
                delete[] contBuf;
                contBuf = new char[actualSize];
                memcpy_s(contBuf, actualSize, backBuffer, package_size);
                delete[] backBuffer;
                dataRead = package_size;
                dr = 0;
                while (dataRead < actualSize) 
                    dr = recv(socket, contBuf + dataRead, actualSize - dataRead, NULL);
                    if (dr == SOCKET_ERROR) 
                        delete[] contBuf;
                        return WSAGetLastError();
                    
                    else if (dr == 0) break;
                    dataRead += dr;
                
                printf("Read total frag of %u \n", dataRead);

            

【讨论】:

FIONREAD 在这里仍然是多余的。 recv() 将阻止并返回任何可供读取的内容。两个系统调用不是一个好习惯。 @EJP 对不起,如果我在这里听起来像个白痴,但recv() 需要知道要读取的数据的大小对吗?那么我怎么知道要读多少呢?除非我只是读入一个预定义大小足以解码总帧大小的数组?

以上是关于C++ websocket 服务器处理消息碎片的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 websocket websocketpp 发送和接收消息?

以快递方式从 POST 请求处理程序发送 WebSocket 消息

socket.io websocket

Socket.IO 上的 PHP 应用程序 + 基于 WebSocket 的通知

从 Tomcat 9 到客户端的 Websocket 二进制消息拆分为 2 个或更多 tcp 数据包

Jetty Websockets - 在处理不可靠的连接时正确发送异步消息