C ++ Socket recv无法跟上某些计算机上的发送

Posted

技术标签:

【中文标题】C ++ Socket recv无法跟上某些计算机上的发送【英文标题】:C++ Socket recv can't keep up with send on certain computers 【发布时间】:2020-06-25 19:11:59 【问题描述】:

我有一个简单的 TCP/UDP 套接字代码,它所做的只是发送一个以微秒为单位的时间戳,然后接收端在数据包到达时获取一个时间戳并显示它们之间的微秒差异。

在某些计算机上,diff 大部分时间为 0,偶尔会出现尖峰,而在我测试过的其他计算机上,diff 会逐渐变大,并且读取端跟不上发送端。

我也用 Python 对此进行了测试,并发现了相同的行为。我已经测试了所有可能的套接字类型和标志。这感觉像是系统配置问题。我已经在 Windows 7/8/10 和 Server 2012 R2 上进行了测试。这种行为在每个人(大公司,大量计算机测试)上都会发生,也不会发生。

我也尝试了一些注册表优化,但似乎没有任何帮助,因为这个问题也发生在 TCP 和 UDP 中。例如,我将附上我的 C++ 代码,您可以在您的计算机上运行它,看看会发生什么,这两个选项中的任何一个都有可能。

发件人:

#pragma comment(lib, "Ws2_32.lib")
#include "winsock2.h"
#include <iostream>
#include <string>
#include <chrono>
#include "Mstcpip.h"
#include "WS2tcpip.h"

#define BUFFSIZE 16

const char* getSocketErrorDescription(int socketError)

    static  char msg[100];
    sprintf(msg, "Winsock error code");
    return msg;


int write_select(UINT_PTR filedes, long microsSec)

    fd_set set;
    struct timeval timeout;
    FD_ZERO(&set);
    FD_SET(filedes, &set);
    timeout.tv_sec = 0;
    timeout.tv_usec = microsSec;

    /* select returns 0 if timeout, 1 if input available, -1 if error. */
    return select(FD_SETSIZE, NULL, &set, NULL, &timeout);


int main()

    int counter = 0;
    WORD version = MAKEWORD(2, 2);
    WSADATA data;
    WSAStartup(version, &data);
    sockaddr_in localAddr;
    ZeroMemory(&localAddr, sizeof(localAddr));

    localAddr.sin_family = PF_INET;
    localAddr.sin_port = htons((u_short)5555);
    //localAddr.sin_addr.s_addr = inet_addr("127.0.0.1");
    InetPton(PF_INET, "127.0.0.1", &(localAddr.sin_addr));

    SOCKET sockfd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
    if (sockfd == INVALID_SOCKET)
    
        std::cout << "SocketClient::create socket failed" << std::endl;
    

    if (connect(sockfd, (sockaddr*)&localAddr, sizeof(localAddr)) == SOCKET_ERROR)
    
        std::cout << ("connect failed. %s", getSocketErrorDescription(WSAGetLastError())) << std::endl;
    

    std::cout << "Sending..." << std::endl;
    while (true)
    
        counter++;
        std::chrono::time_point<std::chrono::system_clock> now = std::chrono::system_clock::now();
        auto duration = now.time_since_epoch();
        auto microseconds = std::chrono::duration_cast<std::chrono::microseconds>(duration);

        __int64 s_t = microseconds.count();
        std::string strMsg = std::to_string(s_t);
        while (write_select(sockfd, 10000) == 0)
        
            std::cout << ("%s: timeout", strMsg.c_str()) << std::endl;
        
        if (send(sockfd, strMsg.c_str(), BUFFSIZE, 0) == SOCKET_ERROR)
        
            int socketError = WSAGetLastError();
            std::cout << ("failed to send string. %s", getSocketErrorDescription(socketError)) << std::endl;;
        
        Sleep(2);
        std::cout << "Counter: " << counter << std::endl;
    


接收者:

#pragma comment(lib, "Ws2_32.lib")
#include "winsock2.h"
#include <iostream>
#include <fstream>
#include <string>
#include <vector>
#include <chrono>
#include "Mstcpip.h"
#include "WS2tcpip.h"

#define BUFFSIZE 16
const char* getSocketErrorDescription(int socketError)

    static  char msg[100];
    sprintf(msg, "Winsock error code");
    return msg;


int read_select(UINT_PTR filedes, long microsSec)

    fd_set set;
    struct timeval timeout;
    FD_ZERO(&set);
    FD_SET(filedes, &set);
    timeout.tv_sec = 0;
    timeout.tv_usec = microsSec;

    /* select returns 0 if timeout, 1 if input available, -1 if error. */
    return select(FD_SETSIZE, &set, NULL, NULL, &timeout);


void parseMsg(std::string& buffer, std::vector<std::string>* messages_list)

    while (buffer.size() >= 16)
    
        messages_list->push_back(buffer.substr(0, 16));
        buffer.erase(0, 16);
    


int main()

    int counter = 0;
    //std::ofstream res_file;
    //res_file.open("D:\\cpp_times.txt");
    WORD version = MAKEWORD(2, 2);
    WSADATA data;
    WSAStartup(version, &data);
    sockaddr_in localAddr;
    ZeroMemory(&localAddr, sizeof(localAddr));

    localAddr.sin_family = PF_INET;
    localAddr.sin_port = htons((u_short)5555);
    //localAddr.sin_addr.s_addr = INADDR_ANY;
    InetPton(PF_INET, "127.0.0.1", &(localAddr.sin_addr));

    SOCKET sockfd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
    if (sockfd == INVALID_SOCKET)
    
        std::cout << "SocketClient::create socket failed" << std::endl;
    

    if(bind(sockfd, (sockaddr*)&localAddr, sizeof(localAddr)) != 0)
    
        int socketError = WSAGetLastError();
        std::cout << ("bind  failed  %s", getSocketErrorDescription(socketError)) << std::endl;
    
    if (listen(sockfd, SOMAXCONN) == SOCKET_ERROR)
    
        std::cout << ("listen  failed") << std::endl;
    

    int clilen = sizeof(localAddr);
    SOCKET new_socket = accept(sockfd, (sockaddr*)&localAddr, &clilen);
    if (new_socket < 0)
        std::cout << "ERROR on accept" << std::endl;


    std::cout << "Listening..." << std::endl;
    while (true)
    
        int nbytesRead = 0;
        std::string m_buffer;
        char recvbuf[BUFFSIZE];
        // Receive until the peer closes the connection 
//      if (read_select(new_socket, 10000))
//      
//          nbytesRead = recv(new_socket, recvbuf, BUFFSIZE, 0);
//          
//      
//      else
//      
//          //std::cout << ("%s: timeout", strMsg.c_str()) << std::endl;
//          continue;
//      
        nbytesRead = recv(new_socket, recvbuf, BUFFSIZE, 0);
        if (nbytesRead > 0)
        
            std::chrono::time_point<std::chrono::system_clock> now = std::chrono::system_clock::now();
            auto duration = now.time_since_epoch();
            auto microseconds = std::chrono::duration_cast<std::chrono::microseconds>(duration);

            __int64 r_t = microseconds.count();
            std::string strMsg = std::to_string(r_t);
            m_buffer.append(recvbuf, nbytesRead);
            std::vector<std::string> message_list;
            parseMsg(m_buffer, &message_list);
            for (auto msg : message_list)
            
                counter++;
                __int64 diff = r_t - _atoi64(msg.c_str());
                std::cout << "SendTime: " << msg.c_str() << " ReceiveTime: " << r_t << " Diff: " << diff << std::endl;
                std::cout << "Counter: " << counter << std::endl;
                //res_file << diff << std::endl;
            
        
        else
        
            continue;
        
        
    

【问题讨论】:

您正在尝试通过 TCP 发送消息。 TCP 不是消息协议,因此您必须自己实现一个并将其置于 TCP 之上。 【参考方案1】:

我发现这段代码有几个问题。但最值得注意的是,在发件人方面,您对send() 的调用是错误的。

if (send(sockfd, strMsg.c_str(), BUFFSIZE, 0) == SOCKET_ERROR)

您发送的std::string 的内容不能保证大小为BUFFSIZE。可能更短,也可能更长,取决于s_t 的实际数值。您应该改用strMsg.size()(有或没有+1,取决于您是否要发送空终止符)。

if (send(sockfd, strMsg.c_str(), strMsg.size()+1, 0) == SOCKET_ERROR)

实际上,您应该在循环中调用send(),因为不能保证发送的字节数与您请求的一样多,它可能发送的字节数可能会减少,因此您必须再次调用send() 才能完成发送剩余字节数。

在接收端,您正确地使用了recv() 的返回值来了解每次调用接收了多少字节,并且您正在尝试缓冲接收到的字节并仅解析您想要的 16 个字节的倍数。总的来说,这是一个很好的方法。但是,您的 m_buffer 变量在错误的范围内声明,因此在每次 recv() 调用后它都会被销毁。字符串格式的__int64 可以少于或多于 16 个字符。它可以少至 1 个字符,也可以多至 19 个字符(如果算上负值的前导 -,则为 20)。

对于您正在尝试的内容,无需将__int64 转换为std::string 进行传输,然后再转换回__int64。您可以按原样以原始二进制形式传输__int64

我看到的其他错误包括错误地使用了select()timeval 参数,错误地使用了std::cout,以及缺乏良好的错误处理。

尝试类似的方法:

发件人:

#pragma comment(lib, "Ws2_32.lib")
#include "winsock2.h"
#include <iostream>
#include <string>
#include <chrono>
#include <cstdint>
#include "Mstcpip.h"
#include "WS2tcpip.h"

const char* getSocketErrorDescription(int socketError)

    static  char msg[100];
    sprintf(msg, "Winsock error code %d", socketError);
    return msg;


int write_select(SOCKET sockfd, long microSec)

    fd_set set;
    struct timeval timeout;
    FD_ZERO(&set);
    FD_SET(sockfd, &set);
    timeout.tv_sec = microSec / 1000000;
    timeout.tv_usec = microSec % 1000000;

    /* select returns 0 if timeout, 1 if input available, -1 if error. */
    return select(0/*sockfd+1*/, NULL, &set, NULL, &timeout);


int main()

    int counter = 0;
    WORD version = MAKEWORD(2, 2);
    WSADATA data;

    int res = WSAStartup(version, &data);
    if (res != 0)
    
        std::cout << "WSAStartup failed. " << getSocketErrorDescription(res) << std::endl;
        return 0;
    

    sockaddr_in localAddr;
    ZeroMemory(&localAddr, sizeof(localAddr));

    localAddr.sin_family = PF_INET;
    localAddr.sin_port = htons(5555);
    //localAddr.sin_addr.s_addr = inet_addr("127.0.0.1");
    InetPton(PF_INET, "127.0.0.1", &(localAddr.sin_addr));

    SOCKET sockfd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
    if (sockfd == INVALID_SOCKET)
    
        std::cout << "socket failed. " << getSocketErrorDescription(WSAGetLastError()) << std::endl;
        WSACleanup();
        return 0;
    

    if (connect(sockfd, (sockaddr*)&localAddr, sizeof(localAddr)) == SOCKET_ERROR)
    
        std::cout << "connect failed. " << getSocketErrorDescription(WSAGetLastError()) << std::endl;
        closesocket(sockfd);
        WSACleanup();
        return 0;
    

    std::cout << "Sending..." << std::endl;
    bool keepGoing = true;
    do
    
        counter++;
        std::chrono::time_point<std::chrono::system_clock> now = std::chrono::system_clock::now();
        auto duration = now.time_since_epoch();
        auto microseconds = std::chrono::duration_cast<std::chrono::microseconds>(duration);

        int64_t s_t = microseconds.count();

        while (write_select(sockfd, 10000) == 0)
        
            std::cout << s_t << ": timeout" << std::endl;
        

        char *ptr = reinterpret_cast<char*>(&s_t);
        size_t size = sizeof(s_t);
        do
        
            res = send(sockfd, ptr, size, 0);
            if (res == SOCKET_ERROR)
            
                std::cout << "send failed. " << getSocketErrorDescription(WSAGetLastError()) << std::endl;
                keepGoing = false;
                break;
            
            ptr += res;
            size -= res;
        
        while (size > 0);

        if (!keepGoing)
            break;

        Sleep(2);
        std::cout << "Counter: " << counter << std::endl;
    
    while (true);

    closesocket(sockFd);
    WSACleanup();

    return 0;


接收者:

#pragma comment(lib, "Ws2_32.lib")
#include "winsock2.h"
#include <iostream>
#include <fstream>
#include <string>
#include <vector>
#include <chrono>
#include <cstdint>
#include <cstring>
#include "Mstcpip.h"
#include "WS2tcpip.h"

#define BUFFSIZE 16

const char* getSocketErrorDescription(int socketError)

    static  char msg[100];
    sprintf(msg, "Winsock error code %d", socketError);
    return msg;


int read_select(SOCKET sockfd, long microsSec)

    fd_set set;
    struct timeval timeout;
    FD_ZERO(&set);
    FD_SET(sockfd, &set);
    timeout.tv_sec = microsSec / 1000000;
    timeout.tv_usec = microsSec % 1000000;

    /* select returns 0 if timeout, 1 if input available, -1 if error. */
    return select(0/*sockFd+1*/, &set, NULL, NULL, &timeout);


void parseMsg(std::string& buffer, std::vector<int64_t>& messages_list)

    while (buffer.size() >= sizeof(int64_t))
    
        int64_t value;
        std::memcpy(&value, buffer.data(), sizeof(value));
        messages_list.push_back(value);
        buffer.erase(0, sizeof(value));
    


int main()

    int counter = 0;
    //std::ofstream res_file;
    //res_file.open("D:\\cpp_times.txt");
    WORD version = MAKEWORD(2, 2);
    WSADATA data;

    int res = WSAStartup(version, &data);
    if (res != 0)
    
        std::cout << ("WSAStartup failed. " << getSocketErrorDescription(res) << std::endl;
        return 0;
    

    sockaddr_in localAddr;
    ZeroMemory(&localAddr, sizeof(localAddr));

    localAddr.sin_family = PF_INET;
    localAddr.sin_port = htons(5555);
    //localAddr.sin_addr.s_addr = INADDR_ANY;
    InetPton(PF_INET, "127.0.0.1", &(localAddr.sin_addr));

    SOCKET sockfd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
    if (sockfd == INVALID_SOCKET)
    
        std::cout << "socket failed. " << getSocketErrorDescription(WSAGetLasstError()) << std::endl;
        WSACleanup();
        return 0;
    

    if (bind(sockfd, (sockaddr*)&localAddr, sizeof(localAddr)) == SOCKET_ERROR)
    
        std::cout << "bind failed. " << getSocketErrorDescription(WSAGetLastError()) << std::endl;
        closesocket(sockfd);
        WSACleanup();
        return 0;
    

    if (listen(sockfd, SOMAXCONN) == SOCKET_ERROR)
    
        std::cout << "listen failed. " << getSocketErrorDescription(WSAGetLastError()) << std::endl;
        closesocket(sockfd);
        WSACleanup();
        return 0;
    

    int clilen = sizeof(localAddr);
    SOCKET new_socket = accept(sockfd, (sockaddr*)&localAddr, &clilen);
    if (new_socket == INVALID_SOCKET)
    
        std::cout << "accept failed. " << getSocketErrorDescription(WSAGetLastError()) << std::endl;
        closesocket(sockfd);
        WSACleanup();
        return 0;
    

    std::cout << "Listening..." << std::endl;

    std::string m_buffer;
    char recvbuf[BUFFSIZE];

    while (true)
    
        int nbytesRead;

        // Receive until the peer closes the connection 
//      if (read_select(new_socket, 10000))
//      
//          nbytesRead = recv(new_socket, recvbuf, BUFFSIZE, 0);
//      
//      else
//      
//          //std::cout << strMsg << ": timeout" << std::endl;
//          continue;
//      

        nbytesRead = recv(new_socket, recvbuf, BUFFSIZE, 0);
        if (nbytesRead <= 0)
        
            if (nbytesRead < 0)
                std::cout << "recv failed. " << getSocketErrorDescription(WSAGetLastError()) << std::endl;
            else
                std::cout << "client disconnected" << std::endl;
            closesocket(new_socket);
            break;
        

        m_buffer.append(recvbuf, nbytesRead);

        std::chrono::time_point<std::chrono::system_clock> now = std::chrono::system_clock::now();
        auto duration = now.time_since_epoch();
        auto microseconds = std::chrono::duration_cast<std::chrono::microseconds>(duration);

        __int64 r_t = microseconds.count();

        std::vector<int64_t> message_list;
        parseMsg(m_buffer, message_list);

        for (auto msg : message_list)
        
            counter++;
            int64_t diff = r_t - msg;
            std::cout << "SendTime: " << msg << " ReceiveTime: " << r_t << " Diff: " << diff << std::endl;
            std::cout << "Counter: " << counter << std::endl;
            //res_file << diff << std::endl;
        
    

    closesocket(sockfd);
    WSACleanup();
    
    return 0;

【讨论】:

是的,我知道,但这条消息的大小固定为 16,所以它没有意义。 @עידןבוגנים:你似乎忽略了雷米的最后两句话。 @עידןבוגנים 您显示的发件人代码中没有任何内容可以确保它为每个 __int64 字符串发送固定的 16 个字节。只有在接收器端,您才使用固定的 16 字节缓冲区来接收字节。无论如何,我已经用更多细节和一个新的代码示例更新了我的答案。 @עידןבוגנים 那么确保您的发送方始终准确发送 16 个字节而您的接收方始终准确接收 16 个字节的代码在哪里?据我所知,您的发件人发送strMsg.c_str() 字节。并且您收到的接收最多 16 个字节。此外,您的代码是阻塞和非阻塞之间的奇怪混合。如果只是为了测试/实验目的,那可能没关系。 感谢您的帮助,但我认为您忽略了这一点,此代码的工作方式与我期望的完全一样,我的问题是为什么在某些计算机上 recv 无法跟上数据包.

以上是关于C ++ Socket recv无法跟上某些计算机上的发送的主要内容,如果未能解决你的问题,请参考以下文章

通过 send() recv() 发送多条消息,Socket 编程,C

socket_recv():无法从socket [0]读取:操作成功完成

Python socket Recv 无法正常工作,有人可以解释一下吗

C++ Socket recv() 总是返回 0

socket 的recv函数返回0应该怎样处理

ZMQ:socket_send/recv 阻塞