手把手写C++服务器(37):手撕代码——高并发多线程技术基石之异步connect万字长文
Posted 沉迷单车的追风少年
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了手把手写C++服务器(37):手撕代码——高并发多线程技术基石之异步connect万字长文相关的知识,希望对你有一定的参考价值。
本系列文章导航: 手把手写C++服务器(0):专栏文章-汇总导航【更新中】
前言:connect创建的时候是默认阻塞模式的,但是现实情况里可能会因为网络差、中间代理服务器、网关等因素造成连接速度慢。此时,在阻塞模式下,程序会阻塞在connect中很久。因此,在实际的项目中,我们一般倾向于使用异步connect技术,学习如何利用IO复用技术设置异步connect,不仅能为后面高并发多线程打下基础,也是后端开发面试必知必会的知识点。
目录
预备知识
1、connect函数
客户端使用connect()与服务端建立连接:
#include<sys/types.h>
#include<sys/socket.h>
int connect(int sockfd,const struct sockaddr*serv_addr,socklen_t addrlen);
sockfd参数由socket系统调用返回一个socket。 serv_addr参数是服务器监听的socket地址, addrlen参数则指定这个地址的长度。
函数返回:
connect成功时返回0。 一旦成功建立连接, sockfd就唯一地标识了这个连接, 客户端就可以通过读写sockfd来与服务器通信。 connect失败则返回-1并设置errno。 其中两种常见的errno是ECONNREFUSED和ETIMEDOUT, 它们的含义如下:
- ECONNREFUSED, 目标端口不存在, 连接被拒绝。
- ETIMEDOUT, 连接超时。
2、getsocketopt方法
getsockopt、setsockopt
读取和设置文件描述符的属性和方法。
#include <sys/socket.h>
int getsockopt(int sockfd, int level, int option_name, void* option_value, socklen_t* restrict option_len);
int setsockopt(int sockfd, int level, int option_name, const void* option_value, socklen_t* restrict option_len);
level:指定属性,如IPv4、IPv6、TCP等
具体选项参数含义:getsockopt
#define SOL_IP 0
#define SOL_IPX 256
#define SOL_AX25 257
#define SOL_ATALK 258
#define SOL_NETROM 259
#define SOL_TCP 6
#define SOL_UDP 17
#define SOL_SOCKET 0xffff
常用选项选讲:
1、SO_REUSEADDR
当TCP连接处于TIME_WAIT状态的时候,SO_REUSEADDR来强制使用被处于TIME_WAIT状态的连接占用的socket地址。使该地址能立即被重用。
2、SO_RCVBUF
TCP接收缓冲区大小
3、SO_SNDBUF
TCP发送缓冲区大小
4、SO_RCVLOWAT
TCP接收缓冲区低水位标记,被I/O复用系统调用用来判断socket是否可写。
5、SO_SNDLOWAT
TCP发送缓冲区低水位标记,被I/O复用系统调用用来判断socket是否可写。
6、SO_LINGER
控制close系统调用在关闭TCP连接时的行为
3、IO复用之select
select的作用是在一段指定的时间内,监听用户感兴趣的文件描述符上的可读、可写、异常等事件。函数原型如下:
#include <sys/select.h>
int select(int nfds, fd_set *readfds, fd_set *writefds,
fd_set *exceptfds, struct timeval *timeout);
函数返回
- select成功时返回就绪文件描述符的总数;
- 如果在超时时间内没有任何文件描述符就绪,select将返回0;
- select失败时返回-1并设置errno。;
- 如果在select等待期间,程序接收到信号,select立即返回-1,并将errno设置为EINTR。
参数详解
- nfds:指定被监听文件描述符总数。通常被设置为select监听所有文件描述符中的最大值+1。
- readfds:可读事件对应文件描述符集合。
- writefds:可写事件对应文件描述符集合。
- exceptfds:异常事件对应文件描述符集合。
- timeout:设置select超时时间。
重要结构体详解
readfds、writefds、exceptfds都是fd_set结构体,timeout是timeval结构体,这里详解一下这两个结构体。
1、fd_set
fd_set结构体定义比较复杂,涉及到位操作,比较复杂。所以通常用宏来访问fd_set中的位。
#include <sys/select.h>
FD_ZERO(fd_set* fdset); // 清除fdset中的所有位
FD_SET(int fd, fd_set* fdset); // 设置fdset中的位
FD_CLR(int fd, fd_set* fdset); // 清除fdset中的位
int FD_ISSET(int fd, fd_set* fdset); // 测试fdset的位fd是否被设置
- FD_ZERO用来清空文件描述符组。每次调用select前都需要清空一次。
- FD_SET添加一个文件描述符到组中,FD_CLR对应将一个文件描述符移出组中。
- FD_ISSET检测一个文件描述符是否在组中,我们用这个来检测一次select调用之后有哪些文件描述符可以进行IO操作。
2、timeval
struct timeval {
long tv_sec; // 秒数
long tv_usec; // 微妙数
};
使用流程
综上所述,我们一般的使用流程是:
- 准备工作——定义readfds、timeval等
- 使用FD_ZERO清零,使用FD_SET设置文件描述符。因为事件发生后,文件描述符集合都将被内核修改。
- 调用select
- 使用FD_ISSET检测文件描述符是否在组中
正式开始
1、代码流程
- 套用socket一般框架。
- 将创建的socket设置为非阻塞模式。
- 连接服务器。
- 调用select监听连接失败的socket上的可写事件。
- 调用getsockopt读取错误码并清除socket上的错误。
2、客户端代码
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <iostream>
#include <fcntl.h>
// #define SEND_DATA "NO BLOCKING MODEL"
#define SEND_DATA "hello"
using namespace std;
// 将文件描述符设置为非阻塞模式
int setnoblocking(int fd) {
// 获取文件描述符旧的状态标志
int old_option = fcntl(fd, F_GETFL);
// 设置非阻塞标志
int new_option = old_option | O_NONBLOCK;
// 设置非阻塞模式
if (fcntl(fd, F_SETFL, new_option) == -1) {
// std::cout << "set no blocking model is error" << std::endl;
return -1;
}
// 返回文件描述符旧的状态标志,以便日后恢复改状态标志
return old_option;
}
// 设置非阻塞connect
int set_unblock_connect(int port, int time = 10) {
//创建socket
int clientfd = socket(AF_INET, SOCK_STREAM, 0);
if (clientfd == -1) {
std::cout << "create client error" << std::endl;
return -1;
}
//向服务器(特定的IP和端口)发起请求
struct sockaddr_in serv_addr;
memset(&serv_addr, 0, sizeof(serv_addr)); //每个字节都用0填充
serv_addr.sin_family = AF_INET; //使用IPv4地址
serv_addr.sin_addr.s_addr = inet_addr("127.0.0.1"); //具体的IP地址
serv_addr.sin_port = htons(port); //端口
// 将clientfd设置为非阻塞模式
int oldconnectfd = setnoblocking(clientfd);
if (oldconnectfd == -1) {
std::cout << "set no block model is error" << std::endl;
close(clientfd);
return -1;
}
int connectfd = connect(clientfd, (struct sockaddr*)&serv_addr, sizeof(serv_addr));
if (connectfd == 0) { // 连接成功
std::cout << "connect success!" << std::endl;
// 恢复clientfd属性
fcntl(clientfd, F_SETFL, oldconnectfd);
return clientfd;
} else if (errno != EINPROGRESS) {
// 只有EINPROGRESS模式才表示连接还在进行
std::cout << "connect is error!" << std::endl;
return -1;
}
fd_set readfds;
fd_set writefds;
struct timeval timeout;
FD_ZERO(&readfds);
FD_SET(clientfd, &writefds);
timeout.tv_sec = time;
timeout.tv_usec = 0;
int ret = select(clientfd + 1, nullptr, &writefds, nullptr, &timeout);
if (ret <= 0) {
// select设置出错或超时
std::cout << "set select is error" << std::endl;
close(clientfd);
return -1;
}
if (!FD_ISSET(clientfd, &writefds)) {
std::cout << "no events on clientfd found!" << std::endl;
close(clientfd);
return -1;
}
int error = 0;
socklen_t length = static_cast<socklen_t>(sizeof error);
// 使用getsockopt获取并清除sockfd上面的错误
if(getsockopt(clientfd, SOL_SOCKET, SO_ERROR, &error, &length) < 0) {
std::cout << "get socket option is error" << std::endl;
close(clientfd);
return -1;
}
if (error != 0) {
std::cout << "connect is fail after select with error " << error << std:: endl;
close(clientfd);
return -1;
}
// 连接成功
std::cout << "connect is success after select!" << std::endl;
fcntl(clientfd, F_SETFL, oldconnectfd);
return clientfd;
}
int main(int argc, char* argv[]) {
if (argc <= 1) {
printf("error! please input port!\\n");
return 1;
}
// 第一个入参是端口
int port = atoi(argv[1]);
// 设置非阻塞connect
int clientfd = set_unblock_connect(port, 10);
if (clientfd < 0) {
std::cout << "setting unblock connect is error" << std::endl;
return 0;
}
// while (true) {
// char recvbuf[32] = {0};
// // 非阻塞模式下,无论是否有数据,都不会阻塞程序
// int ret = recv(clientfd, recvbuf, 32, 0);
// if (ret > 0) {
// std::cout << "recv data successfully!" << std::endl;
// } else if(ret == 0) {
// std::cout << "socket is closed!" << std::endl;
// break;
// } else {
// if (errno == EWOULDBLOCK) {
// std::cout << "no data avaliable" << std::endl;
// } else if (errno == EINTR) {
// std::cout << "recv data interruptered by signal" << std::endl;
// } else {
// break;
// }
// }
// }
close(clientfd);
return 0;
}
3、服务端代码
// 设置非阻塞模式
#include <stdio.h>
#include <string.h>
#include <stdlib.h>
#include <unistd.h>
#include <arpa/inet.h>
#include <sys/socket.h>
#include <assert.h>
#include <errno.h>
#include <netinet/in.h>
#include <iostream>
int main(int argc, char* argv[]) {
if (argc <= 1) {
printf("error! please input port!\\n");
return 1;
}
// 创建监听socket
int listenfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
assert(listenfd);
int port = atoi(argv[1]);
// 将套接字和IP、端口绑定
struct sockaddr_in serv_addr;
memset(&serv_addr, 0, sizeof(serv_addr)); //每个字节都用0填充
serv_addr.sin_family = AF_INET; //使用IPv4地址
serv_addr.sin_addr.s_addr = inet_addr("127.0.0.1"); //具体的IP地址
serv_addr.sin_port = htons(port); //端口
if (bind(listenfd, (struct sockaddr*)&serv_addr, sizeof(serv_addr)) == -1) {
std::cout << "bind listen socket is error" << std::endl;
close(listenfd);
return -1;
}
// 进入监听状态,等待用户发起请求
if (listen(listenfd, SOMAXCONN) == -1) {
std::cout << "listen error" << std::endl;
close(listenfd);
return -1;
}
// 接收客户端请求
while (true) {
struct sockaddr_in clientaddr;
socklen_t clientaddrlen = sizeof(clientaddr);
int clientfd = accept(listenfd, (struct sockaddr *)& clientaddr, &clientaddrlen);
if (clientfd != -1) {
std::cout << "accept a client connection" << std::endl;
} else {
std::cout << "accept connection error!" << std::endl;
}
}
close(listenfd);
return 0;
}
4、实验效果
参考
以上是关于手把手写C++服务器(37):手撕代码——高并发多线程技术基石之异步connect万字长文的主要内容,如果未能解决你的问题,请参考以下文章
手把手写C++服务器(35):手撕代码——高并发高QPS技术基石之非阻塞send万字长文
手把手写C++服务器(30):手撕代码——基于TCP/IP的抛弃服务discard