网络 一篇博文搞懂五种常见的IO模型
Posted WhiteShirtI
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了网络 一篇博文搞懂五种常见的IO模型相关的知识,希望对你有一定的参考价值。
概念前情
阻塞:为了完成一个功能,发起调用,若不具备完成功能的条件,则调用一直阻塞等待
非阻塞:为了完成一个功能,发起调用,若不具备完成功能的条件,则立即返回一个
阻塞与非阻塞的区别:常用于讨论函数是否阻塞,表示这个函数无法立即完成功能时是否立即返回
同步:任务完成的流程通常是串行化的,并且任务由进程自身完成
异步:任务完成的流程通常是不确定的,并且任务由系统完成
同步与异步的区别:通常用于讨论功能的完成方式,表示一个功能是否是顺序化且是否由自己来完成
异步的种类:异步阻塞----等待系统完成功能。异步非阻塞----不等待系统完成功能
同步好还是异步好?:同步处理流程简单,同一时间占用资源少;而异步处理效率高,但占用资源多
IO完成过程:1、等待IO就绪(满足IO的条件)2、进行数据拷贝
阻塞IO
阻塞IO就是调用者发起IO请求调用,由于请求调用的条件不满足,就会一直等待,直到条件满足
用户线程通过系统调用recvfrom发起IO读操作,由用户空间转到内核空间。内核等到数据报,待数据报到达后,然后将接收的数据拷贝到用户空间,完成recvfrom操作。
优缺点:阻塞IO模型的流程非常简单,代码操作也简单,任务的操作都是顺序的。但是该模型也有很明显的缺点:无法充分利用资源,任务处理的效率比较低
非阻塞IO
每次用户询问内核是否有数据报准备好(文件描述符缓冲区是否就绪),当数据报准备好的时候,就进行拷贝数据报的操作。当数据报没有准备好的时候,也不阻塞程序,内核直接返回未准备就绪的信号,等待用户程序的下一次轮询
优缺点:与阻塞IO相比,充分利用了IO等待时间,提高了任务处理的效率。但是流程相对于阻塞IO较为复杂,需要循环访问处理,且响应不够实时,只有等待事情办完后才能循环回去重新发起IO
信号驱动IO
通过信号定义IO就绪的处理方式,当收到信号(SIGIO)时表示IO已就绪,此时就可以在处理方式中发起IO调用
优缺点:相对非阻塞IO,处理IO更加实时,资源也得到充分利用。但是处理流程更加复杂,需要定义信号的处理方式,且流程既有主控流程也有信号处理流程,也涉及到信号是否可靠的问题
异步IO
IO处理顺序不确定,IO(等待+数据拷贝)都由操作系统来完成。自定义IO完成信号处理方式,发起异步调用,告诉操作系统要完成指定功能,剩下的IO功能完全由操作系统完成,完成后通过信号通知进程
异步IO使用的不再是read和write的系统接口了,应用工程序调用aio_XXXX系列的内核接口。当应用程序调用aio_read的时候,内核一方面去取数据报内容返回,另外一方面将程序控制权还给应用进程,应用进程继续处理其他事务。这样应用进程就是一种非阻塞的状态。当内核的数据报就绪的时候,是由内核将数据报拷贝到应用进程中,返回给aio_read中定义好的函数处理程序。
优缺点:充分利用资源,高效率地处理任务。消耗大部分资源,流程非常复杂
多路转接IO
多路转接IO也称IO复用,主要用于对大量的描述符进行IO就绪事件监控,能够让我们的进程只针对就绪了的描述符进行IO操作
IO的就绪事件分为三种
1、可读事件:一个描述符对应的缓冲区中有数据可读
2、可写事件:一个描述符对应的缓冲区中有剩余空间可以写入数据
3、异常事件:一个描述符发生了某些特定的异常信息
只对就绪的描述符进行IO操作的好处----避免阻塞,且提高处理效率
1、在默认的socket中,例如tcp一个服务端只能与一个客服端的socket通信一次,因为我们不知道哪个客户端新建的socket有数据到来或者监听socket有新连接,有可能就会对没有没有新连接到来的监听socket进行accept操作而阻塞,或者对没有数据到来的普通socket进行recv阻塞
2、在tcp服务端中,将所有的socket设置为非阻塞,若没有数据到来,则立即报错返回,进行下一个描述符的操作,这种操作中,有一个不好的地方,就是也对没有就绪事件的描述符进行操作,降低了处理效率
在实现多路转接IO中,操作系统提供了三种模型,分别为select模型、poll模型和epoll模型
适用场景:适用于有大量描述符需要监控,但是同一时间只有少量描述符活跃的场景
select模型
使用流程
一、定义想要监控的事件的描述符集合初始化集合,将需要监控指定事件的描述符添加到指定事件的描述符集合中
集合:是一个
fd_set
结构体,结构体中只有一个成员,是一个数组,这个数组主要是作为位图进行使用。向集合中添加一个描述符,描述符就是一个数字,添加描述符其实就是将这个数字对应的比特位置1,表示置为1的位置对应的描述符被添加到集合中了。这个数组中有多少个比特位或者说select最多能监控多少描述符,取决于宏__FD_SETSIZE
,默认为1024
代码操作分为三步
1、定义指定事件的集合 fd_set rfds
----可读事件集合
2、初始化集合 void FD_ZERO(fd_set *set)
----清空指定的描述符集合
3、将需要监控的事件的描述符添加到集合中void FD_SET(int fd, fd_set *set)
----将fd描述符添加到set集合中
二、发起调用,将集合中的数据拷贝到内核中进行监控,监控采用轮询遍历判断方式进行
接口:int select(int nfds, fd_set *readfds, fd_set *writefds,fd_set *exceptfds, const struct timespec *timeout,const sigset_t *sigmask)
参数内容(nfds
:集合中最大描述符的数值+1,目的是为了减少内核中遍历次数;readfds
:可读事件集合;writefds
:可写事件集合;exceotfds
:异常事件集合;timeout
:select默认是一个阻塞操作< struct timeval{tv_sec; tv_usec} >,若timeout=NULL表示永久阻塞,直到有描述符就绪才返回;若timeout中的数据为0,则表示非阻塞,没有就绪就立即返回;若timeout有数据,若指定时间内没有描述符就绪则超时返回;返回值
:返回值小于0表示监控出错,等于0表示没有描述符就绪,大于0表示就绪的描述符的个数)
select会在返回前将所有集合中没有就绪的描述符都给从集合中移除出去(调用返回后,集合中保存的都是就绪的描述符)
三、select调用返回后,进程遍历哪些描述符还在哪些集合中,就可以知道哪些描述符就绪了哪些事件,进而进行对应操作
四、其他操作:void FD_CLR(int fd, fd_set *set)
从set集合中移除fd描述符;int FD_ISSET(int fd, fd_set *set)
判断fd描述符是否在set集合中
简单实例:通过对标准输入的监控,体会监控的作用
#include <stdio.h>
#include <unistd.h>
#include <stdio.h>
#include <sys/select.h>
int main()
{
char buf[1024] = {0};
while (1)
{
//1.定义集合,每次监控都需要重新添加描述符
fd_set set;
FD_ZERO(&set);//初始化清空集合
FD_SET(0, &set);//将标准输入描述符添加到集合中
struct timeval tv;
tv.tv_sec = 3;
tv.tv_usec = 0;
//2.发起调用
printf("start monitoring\\n");
//select返回时会删除集合中没有就绪的描述符
int ret = select(1, &set, NULL, NULL, &tv);
if (ret < 0)
{
perror("select error");
return -1;
}
else if (ret == 0)
{
//没有描述符就绪的情况下返回的就是超时
printf("monitoring time out\\n");
continue;
}
printf("descriptor ready or timeout waiting\\n");
if (FD_ISSET(0, &set))//当描述符还在集合中,则表示该描述符已就绪
{
printf("start reading data\\n");
char buf[1024] = {0};
ret = read(0, buf, 1023);
if (ret < 0)
{
perror("read error");
return -1;
}
printf("buf:%s\\n", buf);
}
}
return 0;
}
运行截图:
进阶实例:实现封装一个并发的服务器。封装一个Select类,每一个实例化的对象都是一个监控对象,向外提供简单接口,可以监控大量的描述符,并且直接能够在外部获取到就绪的描述符;不需要让用户知道select是如何进行的,不需要用户在外部进行复杂的操作
这里tcp服务器我们用之前写过的代码
网络 TCP协议(C++代码|通过tcp协议实现客户端与服务端之间的通信)
#include <cstdio>
#include <unistd.h>
#include <string>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <sys/socket.h>
using namespace std;
//该值表示用一时间能够接收多少客户端连接
//并非指整个通信最多接收多少客户端连接
#define MAX_LISTEN 5
#define CHECK_RET(q) if((q) == false){return -1;}
class TcpSocket
{
public:
TcpSocket()
:_sockfd(-1)
{}
int GetFd()
{
return _sockfd;
}
void SetFd(int fd)
{
_sockfd = fd;
}
bool Socket()
{
_sockfd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP);
if (_sockfd < 0)
{
perror("socket error");
return false;
}
return true;
}
bool Bind(const string &ip, uint16_t port)
{
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = inet_addr(ip.c_str());
socklen_t len = sizeof(struct sockaddr_in);
int ret = bind(_sockfd, (struct sockaddr*)&addr, len);
if (ret < 0)
{
perror("bind error");
return false;
}
return true;
}
bool Listen(int backlog = MAX_LISTEN)
{
int ret = listen(_sockfd, backlog);
if (ret < 0)
{
perror("listen error");
return false;
}
return true;
}
bool Accept(TcpSocket *new_sock, string *ip = NULL, uint16_t *port = NULL)
{
struct sockaddr_in addr;
socklen_t len = sizeof(struct sockaddr_in);
int new_fd = accept(_sockfd, (struct sockaddr*)&addr, &len);
if (new_fd < 0)
{
perror("accept error");
return false;
}
new_sock->_sockfd = new_fd;
if (ip != NULL)
{
*ip = inet_ntoa(addr.sin_addr);
}
if (port != NULL)
{
*port = ntohs(addr.sin_port);
}
return true;
}
bool Recv(string *buf)
{
char tmp[4096] = {0};
int ret = recv(_sockfd, tmp, 4096, 0);
if (ret < 0)
{
perror("recv error");
return false;
}
else if (ret == 0)//默认阻塞,没有数据就会等待,返回0表示连接断开
{
printf("connection broken\\n");
return false;
}
buf->assign(tmp, ret);
return true;
}
bool Send(const string &data)
{
int ret = send(_sockfd, data.c_str(), data.size(), 0);
if (ret < 0)
{
perror("send error");
return false;
}
return true;
}
bool Close()
{
if (_sockfd > 0)
{
close(_sockfd);
_sockfd = -1;
}
return true;
}
bool Connect(const string &ip, uint16_t port)
{
struct sockaddr_in addr;
addr.sin_family = AF_INET;
addr.sin_port = htons(port);
addr.sin_addr.s_addr = inet_addr(ip.c_str());
socklen_t len = sizeof(struct sockaddr_in);
int ret = connect(_sockfd, (struct sockaddr*)&addr, len);
if (ret < 0)
{
perror("connect error");
return false;
}
return true;
}
private:
int _sockfd;
};
对select进一步得封装
#include <iostream>
#include <vector>
#include <sys/select.h>
#include "tcpsocket.hpp"
using namespace std;
#define MAX_TIMEOUT 3000
class Select
{
public:
Select()
:_maxfd(-1)
{
FD_ZERO(&_rfds);
}
bool Add(TcpSocket& sock)
{
int fd = sock.GetFd();
FD_SET(fd, &_rfds);
//每次添加新的描述符都需要判断最大描述符
_maxfd = _maxfd > fd ? _maxfd : fd;
return true;
}
bool Del(TcpSocket& sock)
{
int fd = sock.GetFd();
FD_CLR(fd, &_rfds);
for (int i = _maxfd; i >= 0; --i)
{
//移除之后,从后往前判断第一个还在集合中的描述符,找到就是最大的
if (FD_ISSET(i, &_rfds))
{
_maxfd = i;
break;
}
}
return true;
}
//进行监控,并且直接向外提供就绪的tcpSocket
bool Wait(vector<TcpSocket> *list, int outTime = MAX_TIMEOUT)
{
struct timeval tv;
tv.tv_sec = outTime / 1000;//outTime以毫秒为单位
tv.tv_usec = (outTime % 1000) * 1000;//计算剩余的微妙
fd_set set;
set = _rfds;//不能直接使用本类中的_rfds,因为select会修改集合中的描述符
int ret = select(_maxfd+1, &set, NULL, NULL, &tv);
if (ret < 0)
{
perror("select error");
return false;
}
else if (ret == 0)
{
printf("wait timeout\\n");
return true;
}
for (int i = 0; i < _maxfd+1; ++i)
{
if (FD_ISSET(i, &set))
{
//还在集合中表示已就绪
TcpSocket sock;
sock.SetFd(i);
list->push_back(sock);
}
}
return true;
}
private:
//可读事件集合,保存要监控的可读事件描述符
fd_set _rfds;
//每次输入都需要输入最大描述符
int _maxfd;
};
客户端也是之前的代码
#include <iostream>
#include <string>
#include <signal.h>
#include "tcpsocket.hpp"
using namespace std;
void sigcb(int no)
{
printf("recv no: %d\\n", no);
}
int main(int argc, char *argv[])
{
if (argc != 3)
{
cout << "Usage: ./tcp_cli ip port" << endl;
return -1;
}
signal(SIGPIPE, sigcb);
string srv_ip = argv[1];
uint16_t srv_port = stoi(argv[2]);
TcpSocket sock;
CHECK_RET(sock.Socket());
CHECK_RET(sock.Connect(srv_ip, srv_port));
while (1)
{
string buf;
cout << "client say: ";
cin >> buf;
sock.Send(buf);
buf.clear();
sock.Recv(&buf);
cout << "server say: "<< buf << endl;
}
sock.Close();
return 0;
}
这是我们新写的主程序:
#include <iostream>
#include "select.hpp"
using namespace std;
int main(int argc, char * argv[])
{
if (argc != 3)
{
cout << "Usage: ./main ip port" << endl;
return -1;
}
string ip = argv[1];
uint16_t port = stoi(argv[2]);
TcpSocket lst_sock;
CHECK_RET(lst_sock.Socket());
CHECK_RET(lst_sock.Bind(ip, port));
CHECK_RET(lst_sock.Listen());
Select s;
s.Add(lst_sock);
while (1)
{
vector<TcpSocket> list;
bool ret = s.Wait(&list);
if (ret == false)
{
continue;
}
for (auto sock : list)
{
if (sock.GetFd() == lst_sock.GetFd())
{
//就绪的描述符与监听套接字描述符一样,就表示需要获取新连接
TcpSocket new_sock;
ret = lst_sock.Accept(&new_sock);
if (ret == false)
continue;
s.Add(new_sock);//将新建套接字也添加监控
}
else
一篇文搞懂《AOP面向切面编程》是一种什么样的体验?