手把手写C++服务器(37):手撕代码——高并发多线程技术基石之异步connect万字长文

Posted 沉迷单车的追风少年

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了手把手写C++服务器(37):手撕代码——高并发多线程技术基石之异步connect万字长文相关的知识,希望对你有一定的参考价值。

本系列文章导航: 手把手写C++服务器(0):专栏文章-汇总导航【更新中】 

前言:connect创建的时候是默认阻塞模式的,但是现实情况里可能会因为网络差、中间代理服务器、网关等因素造成连接速度慢。此时,在阻塞模式下,程序会阻塞在connect中很久。因此,在实际的项目中,我们一般倾向于使用异步connect技术,学习如何利用IO复用技术设置异步connect,不仅能为后面高并发多线程打下基础,也是后端开发面试必知必会的知识点。

目录

预备知识

1、connect函数

函数返回:

2、getsocketopt方法

 getsockopt、setsockopt

常用选项选讲:

3、IO复用之select

函数返回

参数详解

重要结构体详解

使用流程

正式开始

1、代码流程

2、客户端代码

3、服务端代码

4、实验效果

参考


预备知识

1、connect函数

客户端使用connect()与服务端建立连接:

#include<sys/types.h>
#include<sys/socket.h>
int connect(int sockfd,const struct sockaddr*serv_addr,socklen_t addrlen);

sockfd参数由socket系统调用返回一个socket。 serv_addr参数是服务器监听的socket地址, addrlen参数则指定这个地址的长度。

函数返回:

connect成功时返回0。 一旦成功建立连接, sockfd就唯一地标识了这个连接, 客户端就可以通过读写sockfd来与服务器通信。 connect失败则返回-1并设置errno。 其中两种常见的errno是ECONNREFUSED和ETIMEDOUT, 它们的含义如下:

  • ECONNREFUSED, 目标端口不存在, 连接被拒绝。
  • ETIMEDOUT, 连接超时。

2、getsocketopt方法

 getsockopt、setsockopt

读取和设置文件描述符的属性和方法。

#include <sys/socket.h>
int getsockopt(int sockfd, int level, int option_name, void* option_value, socklen_t* restrict option_len);
int setsockopt(int sockfd, int level, int option_name, const void* option_value, socklen_t* restrict option_len);

level:指定属性,如IPv4、IPv6、TCP等

具体选项参数含义:getsockopt

#define SOL_IP 0

#define SOL_IPX 256

#define SOL_AX25 257

#define SOL_ATALK 258

#define SOL_NETROM 259

#define SOL_TCP 6

#define SOL_UDP 17

#define SOL_SOCKET 0xffff

常用选项选讲:

1、SO_REUSEADDR

当TCP连接处于TIME_WAIT状态的时候,SO_REUSEADDR来强制使用被处于TIME_WAIT状态的连接占用的socket地址。使该地址能立即被重用。

2、SO_RCVBUF

TCP接收缓冲区大小

3、SO_SNDBUF

TCP发送缓冲区大小

4、SO_RCVLOWAT

TCP接收缓冲区低水位标记,被I/O复用系统调用用来判断socket是否可写。

5、SO_SNDLOWAT

TCP发送缓冲区低水位标记,被I/O复用系统调用用来判断socket是否可写。

6、SO_LINGER

控制close系统调用在关闭TCP连接时的行为

3、IO复用之select

select的作用是在一段指定的时间内,监听用户感兴趣的文件描述符上的可读、可写、异常等事件。函数原型如下:

#include <sys/select.h>
int select(int nfds, fd_set *readfds, fd_set *writefds,
                fd_set *exceptfds, struct timeval *timeout);

函数返回

  • select成功时返回就绪文件描述符的总数;
  • 如果在超时时间内没有任何文件描述符就绪,select将返回0;
  • select失败时返回-1并设置errno。;
  • 如果在select等待期间,程序接收到信号,select立即返回-1,并将errno设置为EINTR。

参数详解

  • nfds:指定被监听文件描述符总数。通常被设置为select监听所有文件描述符中的最大值+1。
  • readfds:可读事件对应文件描述符集合。
  • writefds:可写事件对应文件描述符集合。
  • exceptfds:异常事件对应文件描述符集合。
  • timeout:设置select超时时间。

重要结构体详解

readfds、writefds、exceptfds都是fd_set结构体,timeout是timeval结构体,这里详解一下这两个结构体。

1、fd_set

fd_set结构体定义比较复杂,涉及到位操作,比较复杂。所以通常用宏来访问fd_set中的位。

#include <sys/select.h>
FD_ZERO(fd_set* fdset);    // 清除fdset中的所有位
FD_SET(int fd, fd_set* fdset); // 设置fdset中的位
FD_CLR(int fd, fd_set* fdset); // 清除fdset中的位
int FD_ISSET(int fd, fd_set* fdset);  // 测试fdset的位fd是否被设置
  • FD_ZERO用来清空文件描述符组。每次调用select前都需要清空一次。
  • FD_SET添加一个文件描述符到组中,FD_CLR对应将一个文件描述符移出组中。
  • FD_ISSET检测一个文件描述符是否在组中,我们用这个来检测一次select调用之后有哪些文件描述符可以进行IO操作。

2、timeval

struct timeval {
    long tv_sec; // 秒数
    long tv_usec; // 微妙数
};

使用流程

综上所述,我们一般的使用流程是:

  1. 准备工作——定义readfds、timeval等
  2. 使用FD_ZERO清零,使用FD_SET设置文件描述符。因为事件发生后,文件描述符集合都将被内核修改。
  3. 调用select
  4. 使用FD_ISSET检测文件描述符是否在组中

正式开始

1、代码流程

  1. 套用socket一般框架。
  2. 将创建的socket设置为非阻塞模式。
  3. 连接服务器。
  4. 调用select监听连接失败的socket上的可写事件。
  5. 调用getsockopt读取错误码并清除socket上的错误。

2、客户端代码

#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <iostream>
#include <fcntl.h>

// #define SEND_DATA "NO BLOCKING MODEL"
#define SEND_DATA "hello"

using namespace std;

// 将文件描述符设置为非阻塞模式
int setnoblocking(int fd) {
    // 获取文件描述符旧的状态标志
    int old_option = fcntl(fd, F_GETFL);
    // 设置非阻塞标志
    int new_option = old_option | O_NONBLOCK;
    // 设置非阻塞模式
    if (fcntl(fd, F_SETFL, new_option) == -1) {
        // std::cout << "set no blocking model is error" << std::endl;
        return -1;
    }
    // 返回文件描述符旧的状态标志,以便日后恢复改状态标志
    return old_option;
}

// 设置非阻塞connect
int set_unblock_connect(int port, int time = 10) {
    //创建socket
    int clientfd = socket(AF_INET, SOCK_STREAM, 0);
    if (clientfd == -1) {
        std::cout << "create client error" << std::endl;
        return -1;
    }
    //向服务器(特定的IP和端口)发起请求
    struct sockaddr_in serv_addr;
    memset(&serv_addr, 0, sizeof(serv_addr));  //每个字节都用0填充
    serv_addr.sin_family = AF_INET;  //使用IPv4地址
    serv_addr.sin_addr.s_addr = inet_addr("127.0.0.1");  //具体的IP地址
    serv_addr.sin_port = htons(port);  //端口

    // 将clientfd设置为非阻塞模式
    int oldconnectfd = setnoblocking(clientfd);
    if (oldconnectfd == -1) {
        std::cout << "set no block model is error" << std::endl;
        close(clientfd);
        return -1;
    }

    int connectfd = connect(clientfd, (struct sockaddr*)&serv_addr, sizeof(serv_addr));
    if (connectfd == 0) {   // 连接成功
        std::cout << "connect success!" << std::endl;
        // 恢复clientfd属性
        fcntl(clientfd, F_SETFL, oldconnectfd);
        return clientfd;
    } else if (errno != EINPROGRESS) {
        // 只有EINPROGRESS模式才表示连接还在进行
        std::cout << "connect is error!" << std::endl;
        return -1;
    }
    fd_set readfds;
    fd_set writefds;
    struct timeval timeout;
    FD_ZERO(&readfds);
    FD_SET(clientfd, &writefds);

    timeout.tv_sec = time;
    timeout.tv_usec = 0;

    int ret = select(clientfd + 1, nullptr, &writefds, nullptr, &timeout);
    if (ret <= 0) {
        // select设置出错或超时
        std::cout << "set select is error" << std::endl;
        close(clientfd);
        return -1;
    }
    if (!FD_ISSET(clientfd, &writefds)) {
        std::cout << "no events on clientfd found!" << std::endl;
        close(clientfd);
        return -1;
    }

    int error = 0;
    socklen_t length = static_cast<socklen_t>(sizeof error);
    // 使用getsockopt获取并清除sockfd上面的错误
    if(getsockopt(clientfd, SOL_SOCKET, SO_ERROR, &error, &length) < 0) {
        std::cout << "get socket option is error" << std::endl;
        close(clientfd);
        return -1;
    }
    if (error != 0) {
        std::cout << "connect is fail after select with error " << error << std:: endl;
        close(clientfd);
        return -1;
    }
    // 连接成功
    std::cout << "connect is success after select!" << std::endl;
    fcntl(clientfd, F_SETFL, oldconnectfd);
    return clientfd;

}

int main(int argc, char* argv[]) {

    if (argc <= 1) {
        printf("error! please input port!\\n");
        return 1;
    }

    // 第一个入参是端口
    int port = atoi(argv[1]);

    // 设置非阻塞connect
    int clientfd = set_unblock_connect(port, 10);
    if (clientfd < 0) {
        std::cout << "setting unblock connect is error" << std::endl;
        return 0;
    }

    // while (true) {
    //     char recvbuf[32] = {0};
    //     // 非阻塞模式下,无论是否有数据,都不会阻塞程序
    //     int ret = recv(clientfd, recvbuf, 32, 0);
    //     if (ret > 0) {
    //         std::cout << "recv data successfully!" << std::endl;
    //     } else if(ret == 0) {
    //         std::cout << "socket is closed!" << std::endl;
    //         break;
    //     } else {
    //         if (errno == EWOULDBLOCK) {
    //             std::cout << "no data avaliable" << std::endl;
    //         } else if (errno == EINTR) {
    //             std::cout << "recv data interruptered by signal" << std::endl;
    //         } else {
    //             break;
    //         }
    //     }
    // }
    close(clientfd);
    return 0;
}

3、服务端代码

// 设置非阻塞模式
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <assert.h>
#include <errno.h>
#include <netinet/in.h>
#include <iostream>

int main(int argc, char* argv[]) {

    if (argc <= 1) {
        printf("error! please input port!\\n");
        return 1;
    }

    // 创建监听socket
    int listenfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
    assert(listenfd);
    int port = atoi(argv[1]);
 
    // 将套接字和IP、端口绑定
    struct sockaddr_in serv_addr;
    memset(&serv_addr, 0, sizeof(serv_addr));  //每个字节都用0填充
    serv_addr.sin_family = AF_INET;  //使用IPv4地址
    serv_addr.sin_addr.s_addr = inet_addr("127.0.0.1");  //具体的IP地址
    serv_addr.sin_port = htons(port);  //端口
    if (bind(listenfd, (struct sockaddr*)&serv_addr, sizeof(serv_addr)) == -1) {
        std::cout << "bind listen socket is error" << std::endl;
        close(listenfd);
        return -1;
    }
 
    // 进入监听状态,等待用户发起请求
    if (listen(listenfd, SOMAXCONN) == -1) {
        std::cout << "listen error" << std::endl;
        close(listenfd);
        return -1;
    }
 
    // 接收客户端请求
    while (true) {
        struct sockaddr_in clientaddr;
        socklen_t clientaddrlen = sizeof(clientaddr);
        int clientfd = accept(listenfd, (struct sockaddr *)& clientaddr, &clientaddrlen);
        if (clientfd != -1) {
            std::cout << "accept a client connection" << std::endl;
        } else {
            std::cout << "accept connection error!" << std::endl;
        }
    }

    close(listenfd);
    return 0;
}

4、实验效果

参考

以上是关于手把手写C++服务器(37):手撕代码——高并发多线程技术基石之异步connect万字长文的主要内容,如果未能解决你的问题,请参考以下文章

手把手写C++服务器(35):手撕代码——高并发高QPS技术基石之非阻塞send万字长文

手把手写C++服务器(30):手撕代码——基于TCP/IP的抛弃服务discard

手把手写C++服务器(29):手撕echo回射服务器代码

手把手写C++服务器(28):手撕CGI通用网关接口服务器代码

手把手写C++服务器(11):手撕网络带宽测试工具TTCP

手把手写C++服务器(34):高并发高吞吐IO秘密武器——epoll池化技术两万字长文