Linux----高级IO(参考UNP)
Posted 4nc414g0n
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Linux----高级IO(参考UNP)相关的知识,希望对你有一定的参考价值。
高级IO(参考UNP)
五种I/O模型
Unix下有五种I/O模型:
- 阻塞式IO
- 非阻塞式IO
IO复用
(select/poll)- 信号驱动式IO(SIGIO)
- 异步IO(POSIX的aio_系列函数)
同步与异步:
- 同步和异步关注的是消息通信机制.
- 所谓同步,就是在发出一个调用时,在没有得到结果之前,该调用就不返回. 但是一旦调用返回,就得到返回值了; 换句话说,就是由调用者主动等待这个调用的结果;
- 异步则是相反, 调用在发出之后,这个调用就直接返回了,所以没有返回结果; 换句话说,当一个异步过程调用发出后,调用者不会立刻得到结果; 而是在调用发出后, 被调用者通过状态、通知来通知调用者,或通过回调函数处理这个调用
注意区分(通信的同步与异步) 与 (进程/多线程的同步和互斥):
- 进程/线程同步也是进程/线程之间直接的制约关系是为完成某种任务而建立的两个或多个线程,这个线程需要在某些位置上协调他们的工作次序而等待、传递信息所产生的制约关系. 尤其是在访问临界资源的时候
注意:
- 任何IO过程中, 都包含两个步骤. 第一是等待, 第二是拷贝. 而且在实际的应用场景中, 等待消耗的时间往往都远远高于拷贝的时间.
让IO更高效, 最核心的办法就是让等待的时间尽量少
阻塞式IO模型
图摘自UNP
阻塞IO
: 在内核将数据准备好之前, 系统调用会一直等待.所有的套接字, 默认都是阻塞方式
进程从调用recvfrom开始到返回之间处于阻塞状态
非阻塞IO模型
非阻塞IO:
如果内核还未将数据准备好, 系统调用仍然会直接返回, 并且返回EWOULDBLOCK错误码
fcntl
#include <unistd.h>
#include <fcntl.h>
int fcntl(int fd, int cmd, ... /* arg */ );
功能(cmd操作):
- 复制一个现有的描述符(cmd=F_DUPFD) .
- 获得/设置文件描述符标记(cmd=F_GETFD或F_SETFD).
- 获得/设置文件状态标记(cmd=F_GETFL或F_SETFL).
- 获得/设置异步I/O所有权(cmd=F_GETOWN或F_SETOWN).
- 获得/设置记录锁(cmd=F_GETLK,F_SETLK或F_SETLKW)
参数:
- fd:文件描述符,默认都是是阻塞IO
- cmd:进行的操作
- …:可变参数,
在cmd=F_SETFL时用,(在Linux中,cmd只能改变 O_APPEND, O_ASYNC, O_DIRECT, O_NOATIME,和 O_NONBLOCK 这些标志)
返回值:
- fcntl的返回值与命令有关。如果出错,所有命令都返回-1,如果成功则返回某个其他值。下列四个命令有特定返回值:F_DUPFD、F_GETFD、F_GETFL、F_GETOWN.第一个返回新的文件描述符,接下来的两个返回相应标志,最后一个返回一个正的进程ID或负的进程组ID----摘自百度百科
使用:
注意:
- 设置标志位的时候是按位与’|’
(fl | O_NONBLOCK)
, 参考之前的O_RDONLY, O_WRONLY, O_RDWR- 在read时,如果读不到数据,O_NDELAY会返回0,由于正常读取到文件末尾时,也会返回0,这样就无法区分是否是遗产隔离,所以就引入了O_NONBLOCK,在读不到数据时,返回-1,并且设置errno为EAGAIN,而读到结尾处,正常返回0
EWOULDBLOCK被设置为了EAGAIN,EAGAIN在宏定义中为11
void SetNoBlock(int fd) int f1=fcntl(fd, F_GETFL);//获取fd的状态标志(这里fd=0标准输入) if(f1<0) cerr<<"获取文件标记位失败..."<<endl; return; fcntl(fd, F_SETFL, f1 | O_NONBLOCK);//设置fd的状态标志 int main() char buffer[1024]; SetNoBlock(0);//将标准输入设为非阻塞 while(true) ssize_t s = read(0,buffer,sizeof(buffer)-1); if(s>0) buffer[s]=0;// \\0 cout<<"reading: "<< buffer <<endl; else if(errno==EWOULDBLOCK||errno==EAGAIN) sleep(3); cout<<"EWOULDBLOCK: "<<EWOULDBLOCK<<" "<<"EAGAIN: "<<EAGAIN<<endl; cout<<"buffer无数据可读,非阻塞..."<<endl; continue; else if(errno==EINTR) cout << "被信号中断" << endl; continue; cout<<"read error"<<endl; break; return 0;
当没有数据的时候,仍会返回
IO复用模型
IO复用模型
: 虽然从流程图上看起来和阻塞IO类似. 实际上最核心在于IO多路转接能够同时等待多个文件描述符的就绪状态
recv write recvfrom 和 select poll epoll系统调用的区别:
- recv write recvfrom: 等待+拷贝(一次只能等待一个fd)
- select poll epoll: 只有等待(一次可以等待多个fd)
select
早期头文件
/* According to earlier standards */
#include <sys/time.h>
#include <sys/types.h>
#include <unistd.h>
最新头文件
/* According to POSIX.1-2001, POSIX.1-2008 */
#include <sys/select.h>
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
功能
: 系统提供select函数来实现多路复用输入/输出模型.
- select系统调用是用来让我们的程序监视多个文件描述符的状态变化的;
- 程序会停在select这里等待,直到被监视的文件描述符有一个或多个发生了状态改变
参数 解释 及 注意:
- 参数nfds是需要监视的最大的文件描述符值+1;
- 描述符从0开始,遍历时需要遍历到nfds-1
- 虽然头文件<sys/select.h>中有FD_SIZE常量标识fd_set的文件描述符总数1024,但很少有程序用那么多(例如打开描述符1 3 4,那么nfds就是5 )
- readfds,writefds,exceptfds分别对应于需要检测的可读文件描述符的集合,可写文件描述符的集合及异常文件描述符的集合,
!!!注意: 三个都为输入输出型参数!!!
- fd_set类型: 一个位图 在Linux 2.6.39内核中的结构如下
- 提供了一组操作fd_set的接口, 来比较方便的操作位图:
void FD_CLR(int fd, fd_set *set);
// 用来清除描述词组set中相关fd 的位
int FD_ISSET(int fd, fd_set *set);
// 用来测试描述词组set中相关fd 的位是否为真
void FD_SET(int fd, fd_set *set);
// 用来设置描述词组set中相关fd的位
void FD_ZERO(fd_set *set);
// 用来清除描述词组set的全部位
- 所有关心读事件的文件描述符,都应该添加在readfds这个集合中, 另两个同理
①输入:
用户告诉内核OS帮我检测一下在这个集合中的fd的读事件比特位的位置,代表文件描述符编号,比特位的内容代表是否关心(
假如是…1001,表示关心0号和3号文件描述符,1,2号不关心)
②输出:
内核告诉用户,你关心的fd,有那些文件描述符已经就绪可以读取了比特位的位置,代表文件描述符编号,比特位的内容代表是否就绪(
假如是…1001,表示0号和3号文件描述符就绪,1,2号未就绪)
③ 对于不关心的可以设为nullptr,比如只关心读:select(fd+1,&readset,NULL,NULL,NULL);
- 输入输出型参数是同一个变量,每次调用返回的时候都要进行重新设置
- 由于
fd_set不具有保存功能,只有互相通知 (内核与用户间) 的能力
,所以select 要被使用需要借助一个第三方数组
,用来管理有效fd(历史上获得文件描述符
), 同时也用来遍历找最大文件描述符, 用于填写nfds参数- 参数timeout为结构timeval,用来设置select()的等待时间
- timeval结构体:
- 有三种情况:
①一直等下去,直到有文件描述符准备好(timeout设为空指
针)才返回
②等待固定的时间,在不超过timeval结构体中设定的时间内有文件描述符准备好就返回,超时也返回
例如:struct timeval timeout = 3,3;
(注意
: 当时间设置太大会返回EINVAL(无效参数)错误)
③ 不等待,检查描述符后立即返回(轮询polling), timeval结构体里的tv_sec和tv_msec必须设为0
返回值:
- 成功时,select()
返回三个返回的描述符集中包含的文件描述符的数量
(即在 readfds、writefds、exceptfds 中设置的总位数,如果在任何fd就绪前超时,则为0
)- 出错时
返回-1
,设置errno表示错误; 文件描述符集未修改,并且超时变得未定义
(注意: 当错误为EINTR,认为连接是正常的,继续接收,不需要重新FD_ZERO(),FD_SET()
)
读就绪 写就绪 异常就绪:
读就绪:
- socket内核中, 接收缓冲区中的字节数, 大于等于低水位标记SO_RCVLOWAT. 此时可以无阻塞的读该文件描述符, 并且返回值大于0;
- socket TCP通信中, 对端关闭连接, 此时对该socket读, 则返回0;
- 监听的socket上有新的连接请求;
- socket上有未处理的错误;
写就绪
- socket内核中, 发送缓冲区中的可用字节数(发送缓冲区的空闲位置大小), 大于等于低水位标记SO_SNDLOWAT, 此时可以无阻塞的写, 并且返回值大于0;
- socket的写操作被关闭(close或者shutdown). 对一个写操作被关闭的socket进行写操作, 会触发SIGPIPE信号;
- socket使用非阻塞connect连接成功或失败之后;
- socket上有未读取的错误;
异常就绪(
?????
)
- socket上收到带外数据. 关于带外数据, 和TCP紧急模式相关(TCP协议头中, 有一个紧急指针的字段),
使用 :(select服务器)
只处理读就绪
sock.hpp(封装TCPserver的 初始化 及 listen bind)namespace Sock //const int g_default=8080; enum SOCKETERROR = 2, BINDERROR, LISTENERROR ; const int backlog = 5;//listen 最大连接数 class ns_Sock public: static int SockInit() int sock=socket(AF_INET, SOCK_STREAM, 0); if(sock<0) cerr<<"SOCK error"<<endl; exit(SOCKETERROR); int opt=1;//解决断开连接TIMEWAIT状态 setsockopt(sock, SOL_SOCKET ,SO_REUSEADDR, &opt, sizeof(opt)); return sock; static void SockBind(ssize_t sock, u_int16_t port) struct sockaddr_in local; bzero(&local,sizeof(local)); //memset(&local, 0, sizeof(local)); local.sin_family=AF_INET; local.sin_port=htons(port); local.sin_addr.s_addr=INADDR_ANY; socklen_t len = sizeof(local); if((bind(sock, (struct sockaddr*)&local,len))<0) cerr<<"bind error"<<endl; exit(BINDERROR); static void SockListen(ssize_t sock) if(listen(sock, backlog)<0) cout<<"listen error"<<endl; exit(LISTENERROR); static void SockAccept();//未封装 ;
select_tcp_server.hpp
思路:
- 服务器最开始的时候是只有一个fd: listen_sock
- 初始化:初始化第三方数组fd_array_[NUM]用来存储和管理有效fd(NUM为最大可存储fd数 (sizeof(fd_set)*8))全初始化为-1
- selectserverInit函数定义一个Sock::ns_Sock* nsck,调用封装的SockInit(),listen,bind,获得listen_sock_,同时将fd_array_[0]设为listen_sock_
- 循环select,
注意每次循环都要先 FD_ZERO(&rfds); 同时循环重新 FD_SET 将管理的有效fd添加到位图 rfds
(循环顺便获得max_fd)- 进行select,一旦读就绪 (新连接到来,连接断开,连接发送数据 …) 就返回
- select正常返回就调用HandlerEvent() ,进行
分情况处理读就绪的各种情况
- 遍历fd_array_数组,当FD_ISSET(fd_arrary_[i],&rfds)为真
- 如果fd_arrary_[i] == listen_sock_( listen_sock_文件描述符读就绪)代表新连接到来,需要进行accept (
注意要判满,是否可以增加新连接
)
- 如果是其他情况就代表有新数据到来,但是仍分为三种情况
1. 正常数据(recv返回值大于0)
2. 断开连接(recv返回值为0)
3. 异常
- 写就绪和异常就绪同理(
MARK
)
select特点及缺点
select特点:
- 将fd加入select监控集的同时,还要再使用一个数据结构array保存放到select监控集中的fd
用于再select 返回后, array作为源数据和fd_set进行FD_ISSET判断
select返回后会把以前加入的但并无事件发生的fd清空,则每次开始select前都要重新从array取得fd逐一加入(FD_ZERO最先),扫描array的同时取得fd最大值maxfd,用于select的第一个参数
select缺点
- 每次调用select, 都需要手动设置fd集合, 从接口使用角度来说也非常不便.
- 每次调用select,都需要把fd集合从用户态拷贝到内核态,这个开销在fd很多时会很大
- 同时每次调用select都需要在内核遍历传递进来的所有fd,这个开销在fd很多时也很大
- select支持的文件描述符数量太小,有FD_SETSIZE最大值1024,也就是说,socket函数可以新建出很多fd,这个fd号可能超出FD_SETSIZE,在这种情况下,使用fd_set结构和select函数就会出问题
poll
poll解决了select的一些问题:
- 解决了:检测的文件描述符有上限的问题
- 解决了:将输入和输出分离,解决编码的时候,必须的重新设置关心的文件描述符,poll不在需要每次都重新设置
epoll
epoll是为处理大批量句柄而作了改进的poll
系统调用
参考好文:图解 | 深入揭秘 epoll 是如何实现 IO 多路复用的!
epoll_create
#include <sys/epoll.h>
int epoll_create(int size);
int epoll_create1(int flags);
功能
:创建一个epoll的句柄
- 自从linux2.6.8之后, size参数是被忽略的.
- 用完之后, 必须调用close()关闭
参数:
size
:obslotete(过时了),忽略flag
:设为0和epoll_create()一样
返回值:
- 成功时,这些系统调用会返回一个非负文件描述符。出错时返回-1,设置errno表示错误
底层实现:
- do_epoll_create创建一个eventpoll对象 (ep_alloc进行初始化工作)
- eventpoll结构如下 (wq为等待队列,rdlist为就绪队列, rbr为红黑树,底层使用了一棵红黑树来管理socket)
- 结构可表现为(
注意:struct file中有一个private_data指针指向eventpoll对
象):
epoll_ctl
#include <sys/epoll.h>
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
功能
:epoll的事件注册函数.
- 它不同于select()是在监听事件时告诉内核要监听什么类型的事件, 而是在这里先注册要监听的事件类型.
参数:
epfd
:epoll_create()的返回值(epoll的句柄).op
:第表示动作,用三个宏来表示.
- EPOLL_CTL_ADD进行注册
- EPOLL_CTL_DEL移除fd
- EPOLL_CTL_MOD修改已注册的fd
fd
:第三个参数是需要监听的fd.event
:告诉内核需要监听什么(events 成员是一个bit mask,由以下可用事件类型中的零个或多个 ORing 组成)
Linux内核经常使用位操作来设置
epoll_event 结构如下:typedef union epoll_data void *ptr; int fd; uint32_t u32; uint64_t u64; epoll_data_t; struct epoll_event uint32_t events; /* Epoll events */ epoll_data_t data; /* User data variable */ __EPOLL_PACKED;
uint32_t events:
- EPOLLIN : 表示对应的文件描述符可以读 (包括对端SOCKET正常关闭);
- EPOLLOUT : 表示对应的文件描述符可以写;
- EPOLLPRI : 表示对应的文件描述符有紧急的数据可读 (这里应该表示有带外数据到来);
- EPOLLERR : 表示对应的文件描述符发生错误;
- EPOLLHUP : 表示对应的文件描述符被挂断;
- EPOLLET : 将EPOLL设为边缘触发(Edge Triggered)模式, 这是相对于水平触发(Level Triggered)来说的.
- EPOLLONESHOT:只监听一次事件, 当监听完这次事件之后, 如果还需要继续监听这个socket的话, 需要再次把这个socket加入到EPOLL队列里
返回值:
- 成功时,epoll_ctl() 返回零。 发生错误时,epoll_ctl() 返回 -1 并适当设置 errno
底层实现:
- do_epoll_ctl,获取到eventpoll fd,目标fd等信息
- do_epoll_ctl内,如果是
EPOLL_CTL_ADD
操作,会调用ep_insert进行注册,EPOLL_CTL_DEL会调用ep_remove进行移除fd,EPOLL_CTL_MOD调用ep_modify进行修改已经注册的fd的监听事件
- epitem是红黑树的节点结构如下:
注意:红黑树的Key是 struct epoll_filefd , Value是 struct epitem
- ep_insert函数分配并初始化epitem,其结构内的ffd设为目标socket的fd,同时让*ep指向eventpoll对象… …
关于ep_item_poll调用的内部细节:
参考好文:
图解 | 深入揭秘 epoll 是如何实现 IO 多路复用的!
谈谈Linux epoll惊群问题的原因和解决方案
- 所有添加到epoll中的事件都会与设备(网卡)驱动程序建立回调关系,也就是说,当响应的事件发生时会调用这个回调方法ep_poll_callback: 发生事件即调用ep_poll_callback回调函数,该回调函数会将自己这个epitem加入到eventpoll的就绪队列中
epoll_wait (MARK一下)
#include <sys/epoll.h>
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
int epoll_pwait(int epfd, struct epoll_event *events, int maxevents, int timeout, const sigset_t *sigmask);
功能:
- 收集在epoll监控的事件中已经发送的事件
参数:
epfd
:epoll的句柄events
:分配好的epoll_event结构体数组.
(epoll将会把发生的事件赋值到events数组中 (events不可以是空指针,内核只负责把数据复制到这个events数组中,不会去帮助我们在用户态中分配内存).)maxevents
:告知内核这个events有多大,这个 maxevents的值不能大于创建epoll_create()时的size.timeout
:超时时间 (毫秒, 0会立即返回, -1是永久阻塞).
返回值:
- 如果函数调用成功,返回对应I/O上已准备好的文件描述符数目,如返回0表示已超时, 返回小于0表示函数失败
底层实现:
do_epoll_wait:
ep_poll中的init_waitqueue_entry定义等待任务current (源码中没找到??)
ep_poll中的__add_wait_queue_exclusive
没有就绪的socketcurrent添加到eventpoll对象的等待队列(
为后来的回调ep_poll_callback检查eventpoll对象上的等待队列里是否有等待项,以唤醒 做准备
)进入休眠,schedule_hrtimeout_range()内选择下一个进程调度
当软中断:socket接收数据到接收队列,插入epoll的就绪队列rdlist, 唤醒休眠的进程,epoll_wait继续执行,调用ep_send_events()将就绪的events和data发送到用户空间,通过epoll_wait的参数struct epoll_event *events,
详见下面回调机制
ep_send_events()返回0且未超时,retry
可大致表现为:
epoll_wait调用ep_poll,当rdlist为空(无就绪fd)时挂起当前进程,直到rdlist不空时进程才被唤醒。然后就将就绪的events和data发送到用户空间(ep_send_events()),如果ep_send_events()返回的事件数为0,并且还有超时时间剩余(timeout),那么我们retry,期待不要空手而归。
底层的回调机制
具体细节见:图解 | 深入揭秘 epoll 是如何实现 IO 多路复用的!
- 当 socket 上数据就绪时候,内核将以 sock_def_readable 这个函数为入口,找到在epoll_ctl时设置的回调函数ep_poll_callback
2.调用ep_poll_callback(), 将epitem添加到eventpoll的rdlist, 并检查eventpoll的等待队列,有就唤醒(current)
- 调用current的func(当时在init_waitqueue_entry内设置的),唤醒current的private指针指向当前(current) task_struct
- epoll_wait继续执行,ep_send_events()将就绪的events和data发送到用户空间
简单的epoll_server(关心读 和 写事件)
以select和poll server为模板修改:
私有成员
:
int epfd_
;(创建的epoll模型对应的fd)
初始化:
//SockInit() //bind //listen //监听fd放在第三方数组首位 epfd_ = epoll_create(128); //创建epoll模型 if(epfd_<0) cerr<<"epoll_create error"<<endl; exit(1);
Loop:
struct epoll_event:
typedef union epoll_data void *ptr; int fd; uint32_t u32; uint64_t u64; epoll_data_t; struct epoll_event uint32_t events; /* Epoll events */ epoll_data_t data; /* User data variable */ __EPOLL_PACKED;
循环前的准备:
int timeout=1000;//ms 设为0:轮询检测 struct epoll_event ev; ev.events = EPOLLIN;//链接到来都是EPOLLIN事件 ev.data.fd=listen_sock_; epoll_ctl(epfd_, EPOLL_CTL_ADD, listen_sock_, &ev);//添加listen套接字 #define EV_NUM 10 struct epoll_event revs[EV_NUM];//epoll_wait的参数:结构体数组 //epoll_wait函数内部会将就绪的epoll_event(event+data(fd)) ,copy放入这个数组
void HandlerEvent(struct epoll_event revs[],int rd_num):
for(int i=0;i<rd_num;i++)//循环从revs中获取就绪事件 //获取当前fd int sock = revs[i].data.fd; uint32_t event = revs[i].events; //读事件就绪 if(event & EPOLLIN) //listen读事件就绪 if(sock == listen_sock_) //accept... //注意一定不能直接读取数据,这里只有epoll知道是否有数据 struct epoll_event ev; ev.events = EPOLLIN; ev.data.fd=accept_sock_; //新listen的fd添加到红黑树里 epoll_ctl(epfd_, EPOLL_CTL_ADD, accept_sock_, &ev); cout<<"新连接获取成功"<<endl; //普通读取事件 else char buffer[1024]; ssize_t s= recv(sock, buffer, sizeof(buffer)-1, 0); if(s>0) buffer[s]=0; cout<<"client$ "<<buffer<<endl; //添加关心写入事件 struct epoll_event ev; ev.events = EPOLLIN | EPOLLOUT; ev.data.fd=sock; epoll_ctl(epfd_, EPOLL_CTL_MOD, sock, &ev); else if(s==0) cout<<"client quit..."<<sock<<endl; close(sock); epoll_ctl(epfd_, EPOLL_CTL_DEL, sock, nullptr); else cout<<"recv error..."<<endl; close(sock); epoll_ctl(epfd_, EPOLL_CTL_DEL, sock, nullptr); //写事件就绪 else if(event & EPOLLOUT) string msg = "OK"; send(sock, msg.c_str(), msg.size(), 0); //取消对写事件的关心(读事件是长关心, 写事件是按需关心) struct epoll_event ev; ev.events = EPOLLIN;//重新将sock改为读事件 ev.data.fd=sock; epoll_ctl(epfd_, EPOLL_CTL_MOD, sock, &ev); //其他或异常 // if(event & EPOLLHUP || event & EPOLLERROR) //
epoll工作方式
水平触发Level Triggered 工作模式
注意
:select和poll其实也是工作在LT模式下. epoll既可以支持LT, 也可以支持ET
epoll默认状态下就是LT工作模式.
- 当epoll检测到socket上事件就绪的时候, 可以不立刻进行处理. 或者只处理一部分.
- 如上面的例子, 由于只读了1K数据, 缓冲区中还剩1K数据, 在第二次调用 epoll_wait 时, epoll_wait
- 仍然会立刻返回并通知socket读事件就绪.
- 直到缓冲区上所有的数据都被处理完, epoll_wait 才不会立刻返回.
- 支持阻塞读写和非阻塞读写
边缘触发Edge Triggered工作模式
当ocket添加到epoll描述符的时候使用了EPOLLET标志, epoll进入ET工作模式
.
- 当epoll检测到socket上事件就绪时, 必须立刻处理.
以上是关于Linux----高级IO(参考UNP)的主要内容,如果未能解决你的问题,请参考以下文章