手把手写C++服务器(36):手撕代码——高并发高QPS技术基石之非阻塞recv万字长文

Posted 沉迷单车的追风少年

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了手把手写C++服务器(36):手撕代码——高并发高QPS技术基石之非阻塞recv万字长文相关的知识,希望对你有一定的参考价值。

本系列文章导航: 手把手写C++服务器(0):专栏文章-汇总导航【更新中】 

前言:没有什么东西是永恒,没有什么方案是万能,阻塞模式和非阻塞模式各有利弊。创建socket是默认阻塞的。但是在高并发多QPS的场景中,阻塞模式会极大程度上影响并发性,使之并发名存实亡。上一讲手把手写C++服务器(35):手撕代码——高并发高QPS技术基石之非阻塞send【万字长文】_沉迷单车的追风少年-CSDN博客讲了非阻塞send,并进行了实验;这一讲重点讲recv,为后面继续讨论非阻塞connect及与IO复用结合打下基础。

目录

预备知识

1、send/recv的本质

2、阻塞模式和非阻塞模式的应用场景

3、文件描述符控制函数:fcntl()

参数详解

4、设置文件描述符为非阻塞模式

5、服务端、客户端响应/请求一般框架

服务端

客户端

正式开始

客户端代码:

服务端代码:

阻塞实验效果:

非阻塞实验效果:

 参考:


预备知识

1、send/recv的本质

send和recv并不是直接收发数据,send函数本质上并不是在网络上发送数据,而是将发送缓冲区的数据拷贝到数据内核中;recv函数的本质是将内核缓冲区中的数据拷贝到应用程序的缓冲区中。

什么时候拷贝、什么时候正式发送,需要看是否启用了nagel算法。如果禁用了nagel算法,存放到内核缓冲区的数据会被立即发出去;有兴趣的同学可以去看一下nagel算法,这也是面试中经常考察的知识点。

如果数据缓冲区满了,阻塞模式和非阻塞模式的表现是不一样的

  • 当socket是阻塞模式时,程序会阻塞在send/recv处。
  • 当socket是非阻塞模式时,send/recv不会阻塞当前程序执行流,会立即返回错误:EWOULDBLOCK或EAGAIN。后面我们会用实验验证这一点。

2、阻塞模式和非阻塞模式的应用场景

没有什么东西是永恒,没有什么方案是万能。非阻塞模式一般用于高并发多线程场景,在编写服务端程序的过程中,务必记住一定要把IO操作和业务逻辑区分开。在此基础上,非阻塞模式会让程序的控制逻辑变得更加复杂。

阻塞模式常见的应用场景:

  1. 临时发送大文件。
  2. 端到端问答模式。
  3. 监听主线程。
  4. ……

后面会专门多开几讲详细讨论这些问题。

3、文件描述符控制函数:fcntl()

不太清楚的小伙伴请看往期:手把手写C++服务器(27):五大文件描述符零拷贝、fcntl控制总结_沉迷单车的追风少年-CSDN博客

file control,文件描述符控制。与之类似的系统调用是ioctl。

#include<unistd.h>
#include<fcntl.h>
int fcntl(int fd, int cmd);
int fcntl(int fd, int cmd, long arg);
int fcntl(int fd, int cmd ,struct flock* lock);

参数详解

4、设置文件描述符为非阻塞模式

#include <fcntl.h>

// 将文件描述符设置为非阻塞模式
int setnoblocking(int fd) {
    // 获取文件描述符旧的状态标志
    int old_option = fcntl(fd, F_GETFL);
    // 设置非阻塞标志
    int new_option = old_option | O_NONBLOCK;
    // 设置非阻塞模式
    fcntl(fd, F_SETFL, new_option);
    // 返回文件描述符旧的状态标志,以便日后恢复改状态标志
    return old_option;
}

5、服务端、客户端响应/请求一般框架

不太清楚的小伙伴请看往期:https://xduwq.blog.csdn.net/article/details/119482107

服务端

// 设置非阻塞模式
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <assert.h>
#include <errno.h>
#include <netinet/in.h>
 
 
int main(int argc, char* argv[]) {
    if (argc <= 1) {
        printf("error! please input port!\\n");
        return 1;
    }
    //创建监听socket
    int listenfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
    assert(serv_sock);
    int port = atoi(argv[1]);
 
    //将套接字和IP、端口绑定
    struct sockaddr_in serv_addr;
    memset(&serv_addr, 0, sizeof(serv_addr));  //每个字节都用0填充
    serv_addr.sin_family = AF_INET;  //使用IPv4地址
    serv_addr.sin_addr.s_addr = inet_addr("127.0.0.1");  //具体的IP地址
    serv_addr.sin_port = htons(port);  //端口
    if (bind(listenfd, (struct sockaddr*)&serv_addr, sizeof(serv_addr)) == -1) {
        std::cout << "bind listen socket is error" << std::endl;
        close(listenfd);
        return -1;
    }
 
    //进入监听状态,等待用户发起请求
    if (listen(listenfd, 20) == -1) {
        std::cout << "listen error" << std::endl;
        close(listenfd);
        return -1;
    }
 
    //接收客户端请求
    struct sockaddr_in clnt_addr;
    socklen_t clnt_addr_size = sizeof(clnt_addr);
    int connfd = 0;
    for ( ; ; ) {
        if ((connfd = accept(listenfd, (struct sockaddr*)&clnt_addr, &clnt_addr_size)) < 0) {
            printf("accept error: %s\\n",strerror(errno));
            return 1;
        }
    }
    close(listenfd);
    return 0;
}

客户端

#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <iostream>
 
using namespace std;
 
int main(int argc, char* argv[]){
    
    if (argc <= 1) {
        printf("error! please input port!\\n");
        return 1;
    }
 
    //创建socket
    int clientfd = socket(AF_INET, SOCK_STREAM, 0);
    if (clientfd == -1) {
        std::cout << "create client error" << std::endl;
        return -1;
    }
 
    int port = atoi(argv[1]);
 
    //向服务器(特定的IP和端口)发起请求
    struct sockaddr_in serv_addr;
    memset(&serv_addr, 0, sizeof(serv_addr));  //每个字节都用0填充
    serv_addr.sin_family = AF_INET;  //使用IPv4地址
    serv_addr.sin_addr.s_addr = inet_addr("127.0.0.1");  //具体的IP地址
    serv_addr.sin_port = htons(port);  //端口
    
    int connectfd = connect(clientfd, (struct sockaddr*)&serv_addr, sizeof(serv_addr));
    if (connectfd == -1) {
        std::cout << "connect error" << std::endl;
        close(clientfd);
        return -1;
    }
   
    //读取服务器传回的数据
    char buffer[40];
    read(clientfd, buffer, sizeof(buffer)-1);
   
    printf("Message form server: %s\\n", buffer);
   
    //关闭套接字
    close(clientfd);
 
    return 0;
}

正式开始

客户端代码:

#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <iostream>
#include <fcntl.h>

// #define SEND_DATA "NO BLOCKING MODEL"
#define SEND_DATA "hello"

using namespace std;

// 将文件描述符设置为非阻塞模式
int setnoblocking(int fd) {
    // 获取文件描述符旧的状态标志
    int old_option = fcntl(fd, F_GETFL);
    // 设置非阻塞标志
    int new_option = old_option | O_NONBLOCK;
    // 设置非阻塞模式
    if (fcntl(fd, F_SETFL, new_option) == -1) {
        // std::cout << "set no blocking model is error" << std::endl;
        return -1;
    }
    // 返回文件描述符旧的状态标志,以便日后恢复改状态标志
    return old_option;
}

int main(int argc, char* argv[]) {

    if (argc <= 1) {
        printf("error! please input port!\\n");
        return 1;
    }

    //创建socket
    int clientfd = socket(AF_INET, SOCK_STREAM, 0);
    if (clientfd == -1) {
        std::cout << "create client error" << std::endl;
        return -1;
    }
    // 第一个入参是端口
    int port = atoi(argv[1]);

    //向服务器(特定的IP和端口)发起请求
    struct sockaddr_in serv_addr;
    memset(&serv_addr, 0, sizeof(serv_addr));  //每个字节都用0填充
    serv_addr.sin_family = AF_INET;  //使用IPv4地址
    serv_addr.sin_addr.s_addr = inet_addr("127.0.0.1");  //具体的IP地址
    serv_addr.sin_port = htons(port);  //端口

    int connectfd = connect(clientfd, (struct sockaddr*)&serv_addr, sizeof(serv_addr));
    if (connectfd == -1) {
        std::cout << "connect error" << std::endl;
        close(clientfd);
        return -1;
    }

    std::cout << "old clientfd option is " << fcntl(clientfd, F_GETFL, 0) << std::endl;

    // 连接成功后再将clientfd设置为非阻塞模式
    int oldconnectfd = setnoblocking(clientfd);
    if (oldconnectfd == -1) {
        std::cout << "set no block model is error" << std::endl;
        close(clientfd);
        return -1;
    }

    // // 获取文件描述符旧的状态标志
    // int old_option = fcntl(clientfd, F_GETFL, 0);
    // // 设置新的非阻塞标志
    // int new_option = old_option | O_NONBLOCK;
    // // 设置非阻塞模式
    // if (fcntl(clientfd, F_SETFL, new_option) == -1) {
    //     std::cout << "set no blocking model is error" << std::endl;
    //     return -1;
    // }

    std::cout << "new clientfd option is " << fcntl(clientfd, F_GETFL) << std::endl;
    std::cout << "new connectfd option is " << fcntl(connectfd, F_GETFL) << std::endl;

    while (true) {
        char recvbuf[32] = {0};
        // 非阻塞模式下,无论是否有数据,都不会阻塞程序
        int ret = recv(clientfd, recvbuf, 32, 0);
        if (ret > 0) {
            std::cout << "recv data successfully!" << std::endl;
        } else if(ret == 0) {
            std::cout << "socket is closed!" << std::endl;
            break;
        } else {
            if (errno == EWOULDBLOCK) {
                std::cout << "no data avaliable" << std::endl;
            } else if (errno == EINTR) {
                std::cout << "recv data interruptered by signal" << std::endl;
            } else {
                break;
            }
        }
    }

   
    //关闭套接字
    close(clientfd);
 
    return 0;
}

服务端代码:

// 设置非阻塞模式
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <assert.h>
#include <errno.h>
#include <netinet/in.h>
#include <iostream>

int main(int argc, char* argv[]) {

    if (argc <= 1) {
        printf("error! please input port!\\n");
        return 1;
    }

    // 创建监听socket
    int listenfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
    assert(listenfd);
    int port = atoi(argv[1]);
 
    // 将套接字和IP、端口绑定
    struct sockaddr_in serv_addr;
    memset(&serv_addr, 0, sizeof(serv_addr));  //每个字节都用0填充
    serv_addr.sin_family = AF_INET;  //使用IPv4地址
    serv_addr.sin_addr.s_addr = inet_addr("127.0.0.1");  //具体的IP地址
    serv_addr.sin_port = htons(port);  //端口
    if (bind(listenfd, (struct sockaddr*)&serv_addr, sizeof(serv_addr)) == -1) {
        std::cout << "bind listen socket is error" << std::endl;
        close(listenfd);
        return -1;
    }
 
    // 进入监听状态,等待用户发起请求
    if (listen(listenfd, SOMAXCONN) == -1) {
        std::cout << "listen error" << std::endl;
        close(listenfd);
        return -1;
    }
 
    // 接收客户端请求
    while (true) {
        struct sockaddr_in clientaddr;
        socklen_t clientaddrlen = sizeof(clientaddr);
        int clientfd = accept(listenfd, (struct sockaddr *)& clientaddr, &clientaddrlen);
        if (clientfd != -1) {
            std::cout << "accept a client connection" << std::endl;
        } else {
            std::cout << "accept connection error!" << std::endl;
        }
    }

    close(listenfd);
    return 0;
}

阻塞实验效果:

当前无数据可以读,会一直阻塞等待……

./server 300
accept a client connection
./client 300
old clientfd option is 2
new clientfd option is 2
new connectfd option is 2

非阻塞实验效果:

就算当前没有数据可以读,程序不会一直阻塞,会直接返回errno。通过解析errno,可以实现我们想要的实验结果。

no data avaliable
no data avaliable
no data avaliable
no data avaliable
no data avaliable
no data avaliable
no data avaliable
no data avaliable
no data avaliable
no data avaliable
no data avaliable
no data avaliable
no data avaliable
no data avaliable
no data avaliable
no data avaliable
no data avaliable
no data avaliable
no data avaliable
no data avaliable
no data avaliable
no data avaliable
no data avaliable
no data avaliable
no data avaliable
no data avaliable

 参考:

以上是关于手把手写C++服务器(36):手撕代码——高并发高QPS技术基石之非阻塞recv万字长文的主要内容,如果未能解决你的问题,请参考以下文章

手把手写C++服务器(37):手撕代码——高并发多线程技术基石之异步connect万字长文

手把手写C++服务器(34):高并发高吞吐IO秘密武器——epoll池化技术两万字长文

手把手写C++服务器(30):手撕代码——基于TCP/IP的抛弃服务discard

手把手写C++服务器(29):手撕echo回射服务器代码

手把手写C++服务器(28):手撕CGI通用网关接口服务器代码

手把手写C++服务器(11):手撕网络带宽测试工具TTCP