I/O多路复用之epoll

Posted TangguTae

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了I/O多路复用之epoll相关的知识,希望对你有一定的参考价值。

目录

epoll的底层原理

epoll对应的接口

epoll_create

epoll_ctl

epoll_wait

epoll的工作方式

水平触发(LT)

边缘触发(ET)

基于epoll的服务器

epoll的优点


相比较select和poll,epoll的设计目的是为了解决前者面临大量文件描述符时性能的下降。它的性能延展性比select和poll要高很多。

epoll的底层原理

epoll底层最重要的两个结构:一个红黑树,一个双向链表

当某一进程调用epoll_create时,Linux内核会创建一个eventpoll结构体,在这个结构体中存在两个成员,一个是struct rb_root rbr;一个是struct list_head rdlist;分别对应红黑树和双向链表。

struct eventpoll
....
/*红黑树的根节点,这颗树中存储着所有添加到epoll中的需要监控的事件*/
struct rb_root rbr;            //红黑树的根节点
/*双链表中则存放着将要通过epoll_wait返回给用户的满足条件的事件*/
struct list_head rdlist;     //双向列表的头结点
....
;

用户进程添加自己想要关心的事件,通过epoll_ctl接口。这些事件都会挂载在红黑树中,正因为如此,epoll可以处理海量的事件的增删查改。

当某个事件准备就绪时,事件会调用回调方法ep_poll_callback,将事件添加到双向链表当中。所以,内核只需检测双向链表中是否为空就可以判断是否有事件就绪。这样可以避免遍历所有的事件,提高效率。

在epoll中,对于每一个事件,都会建立一个epitem结构体。

struct epitem
struct rb_node rbn;//红黑树节点
struct list_head rdllink;//双向链表节点
struct epoll_filefd ffd; //事件句柄信息
struct eventpoll *ep; //指向其所属的eventpoll对象
struct epoll_event event; //期待发生的事件类型

每次添加事件到epoll模型当中,即调用epoll_ctl 执行EPOLL_CTL_ADD,就会将epitem添加到红黑树当中。当内核IO准备就绪,将epitem添加到双向链表当中。

下面是一张比较经典的图,用来形容几个结构之间的关系。

 


epoll对应的接口

epoll总共有三组接口分别是epoll_create,epoll_ctl和epoll_wait。头文件是sys/epoll.h

epoll_create

功能:创建epoll模型(实例)

最开始来说size的目的是为了告知epoll模型来检查文件描述符的个数,并不是一个上限,而是告诉内核应该如何为内部数据结构划分初始大小。但是在Linux2.6.8以后,他的参数size是可以被忽略的,但是必须得大于0.

返回值为一个文件描述符,也就是该epoll模型的epfd,在后面的epoll_ctl 和 epoll_wait接口中都得用到该参数。

epoll_ctl

对epoll事件注册、修改、删除的接口

第一个参数就是epoll模型所对应的文件描述符。

第二个参数op用来指定需要执行的操作,可以是如下的三种值

1、EPOLL_CTL_ADD

该选项是让epoll_ctl 将文件描述符添加到epoll模型当中,文件描述符上所关心的事件由epoll_events参数所决定。

2、EPOLL_CTL_MOD

修改文件描述符上所设定的事件。

3、EPOLL_CTL_DEL

将该文件描述符从epoll模型中移除。

第三个参数为所关心的文件描述符。

第四个参数为epoll_events结构体。

epoll_events结构体

struct epoll_event 
    uint32_t     events;      /* Epoll events */
    epoll_data_t data;        /* User data variable */
;

typedef union epoll_data 
    void        *ptr;
    int          fd;
    uint32_t     u32;
    uint64_t     u64;
 epoll_data_t;

其中 epoll_event 中的events是我们所想关心改文件描述符上的哪些事件。

下面的联合体一般只用对应的fd成员。

events所对应的集合

EPOLLIN : 表示对应的文件描述符可以读 (包括对端SOCKET正常关闭);
EPOLLOUT : 表示对应的文件描述符可以写;
EPOLLPRI : 表示对应的文件描述符有紧急的数据可读 (这里应该表示有带外数据到来);
EPOLLERR : 表示对应的文件描述符发生错误;
EPOLLHUP : 表示对应的文件描述符被挂断;
EPOLLET : 将EPOLL设为边缘触发(Edge Triggered)模式, 这是相对于水平触发(Level Triggered)来说的.
EPOLLONESHOT:只监听一次事件, 当监听完这次事件之后, 如果还需要继续监听这个socket的话, 需要再次把这个socket加入到EPOLL队列里.

返回值

 0代表成功,-1代表发生错误。

epoll_wait

事件的等待

epoll_wait用来返回epoll模型中处于就绪态的文件描述符信息。

第一个参数和之前一样。

第二个参数为输出型参数,表明有哪些事件已经就绪,都会在events数组当中。

第三个参数maxevents为数组events的长度,该数组的空间由用户去申请,数组的长度需要告知内核。

第四个参数为等待时间,用法和poll的timeout是一样的。timeout>0表示一次等待的时间,=0表示非阻塞等待,=-1表示阻塞等待。

返回值

当返回值大于0时,表示的意思是有多少个文件描述符上的事件已经就绪。这样我们就知道当等待成功时,events数组的长度是多少。返回值为0表示等待超时。返回值小于0表示等待出错。


epoll的工作方式

epoll有两种工作方式:水平触发(LT)和边缘触发(ET)

水平触发(LT)

水平触发通知:如果文件描述符上可以非阻塞地执行 I/O 系统调用,此时认为它已经就绪。

通俗一点来说,只要文件描述符处于就绪状态(有数据可读或者有空闲的缓冲区可写)系统就会一直通知用户层赶紧来处理。

LT是epoll默认的通知方式,由于水平触发方式当文件描述符就绪时,不一定要立即把事件执行完毕,因为可以重复多次的检查IO状态。但是性能上将会比下面的这种触发方式通常低一点(epoll_wait返回的次数增多)。

边缘触发(ET)

边缘触发通知:如果文件描述符自上次状态检查以来有了新的I/O 活动(比如新的输入),此时需要触发通知。

通俗来说就是当文件描述符上的事件就绪,只会通知一次,告诉你已经就绪了,赶紧来处理。

由于这种情况下,要尽可能的多执行IO,不然将会失去执行IO的机会。这样就会面临一些问题,数据无法完全读上来(读缓存满)或者数据无法完整的写入到缓冲区(缓冲区没空间),下一次就不会通知该事件就绪,直到新的IO事件的到来。这样就会少读或者少写的问题。

所以说如何将数据完整的读上来或者写入到缓冲区就是ET模式下需要解决的问题。

解决方法:

以读数据为例

1、循环读取,如果recv的返回一直大于0,表明还有数据可读。如果文件描述符为阻塞模式的话,最后一次数据读取完毕,下一次在读取就会被阻塞住。如果是单线程的情况下整个进程会被阻塞住。系统以后的IO都无法执行(见2)。

2、将文件描述符设置为非阻塞。一直读,直到recv返回值小于0,然后检查对应的错误码。如果错误码是EAGAIN或者是EWOULDBLOCK表明数据已经读取完毕。

当然了,写数据可能有些特殊,可能是缓冲区已经满了,但是数据还未写完,此时send函数也会返回-1,错误码被置为EAGAIN或者是EWOULDBLOCK,此时需要增加判断的逻辑,判断是否已经发送完,如果没有发送完再次使能一次该文件描述符对应的写事件,会让epoll_wait监听一次。

一张图说明ET和LT的触发方式。

 


基于epoll的服务器

下面的代码是采用默认的LT触发方式。

#include<iostream>                                               
#include<sys/epoll.h>
#include<cstring>
#include<unistd.h>
#include<sys/socket.h>
#include<sys/types.h>
#include<netinet/in.h>
#include<arpa/inet.h>
using namespace std;

const int NUM = 64;
class EpollServer

  private:
    int lsock;
    uint16_t port;
    int epfd;
  public:
    EpollServer(uint16_t _port)
    
      port = _port;
    
    ~EpollServer()
    
      if(lsock > 0)
        close(lsock);
      if(epfd > 0)
        close(epfd);
    
    void InitServer()//初始化服务器
    
      lsock = socket(AF_INET,SOCK_STREAM,0);
      if(lsock < 0)
       
        cerr<<"sock error!"<<endl;
        exit(-1);
      
      sockaddr_in local;
      local.sin_family = AF_INET;
      local.sin_port = htons(port);
      local.sin_addr.s_addr = INADDR_ANY;
      if(bind(lsock,(struct sockaddr*)&local,sizeof(local)) < 0)
      
        cerr<<"bind error!"<<endl;
        exit(-1);
      
      if(listen(lsock,5) < 0)
      
        cerr<<"listen error!"<<endl;
        exit(-1);
      
      if((epfd = epoll_create(256)) < 0)//先创建epoll事件
      
        cerr<<"epoll create error!!!"<<endl;
        exit(-1);
      
    
    void ServerStart()
    
      //先关心lsock套接字
      AddEvent(lsock,EPOLLIN);//关心监听套接字上的读事件                    
      struct epoll_event revs[NUM];//一旦有事件发生就会存入改数组中
      int timeout = 10000;
      while(true)
      
        int ret = epoll_wait(epfd,revs,NUM,timeout);
        if(ret > 0)
        
          cout<<"事件发生..."<<endl;
          HandEvents(ret,revs);
        
        else if(ret == 0)
        
          cout<<"timeout ..."<<endl;
        
        else 
        
          cerr<<"epoll wait error!"<<endl;
        
      

    
  private:
    void AddEvent(int sock, uint32_t event)//添加事件
    
      struct epoll_event ev;
      ev.events = 0;
      ev.events |= event;
      ev.data.fd = sock;
      if(epoll_ctl(epfd,EPOLL_CTL_ADD,sock,&ev) < 0)
      
        cerr<<"Add event error!!!"<<endl;
        exit(-1);
      
    
    void DeleteEvent(int sock)//从epoll模型中删除sock对应的事件
    
      if(epoll_ctl(epfd,EPOLL_CTL_DEL,sock,nullptr) < 0)
      
        cerr<<"delete event error!"<<endl;
      
    
    void HandEvents(int num, struct epoll_event* revs)
    
      for(int i=0; i < num; i++)
      
        int sock = revs[i].data.fd;
       if(revs[i].events & EPOLLIN)//如果是读事件的就绪
       

        if(sock == lsock)//如果是监听套接字
        
          //新连接的到来
          struct sockaddr_in remote;
          socklen_t len = sizeof(remote);
          int sk = accept(lsock,(struct sockaddr*)&remote,&len);
          if(sk >= 0)
          
            cout<<"get a new link "<<inet_ntoa(remote.sin_addr)<<": "<<ntohs(remote.sin_port)<<endl;
            AddEvent(sk,EPOLLIN);//添加新的事件到epoll模型中
          
          else 
            cout<<"accept error!"<<endl;
        
        else 
        //不是监听套接字表明有数据的到来
          char buffer[10];//接收缓冲区很小
          int ret = recv(sock,buffer,sizeof(buffer)-1,0);
          if(ret > 0)
          
            buffer[ret-1] = '\\0';
            cout<<"recv from sock "<<sock<<" is "<<buffer<<endl;
          
          else if(ret == 0)
          
            cout<<"socket "<<sock<<" link is eixt..."<<endl;
            DeleteEvent(sock);
            close(sock);
          
          else 
          
            cout<<"recv error!"<<endl;
            DeleteEvent(sock);
            close(sock);
          

        

       
      
  
;
int main(int argc,char* argv[])

  if(argc!=2)
  
    cout<<"parameter error!!!"<<endl;
    exit(-1);
  
  EpollServer* ps = new EpollServer(atoi(argv[1]));
  ps->InitServer();
  ps->ServerStart();
  return 0;

运行结果

由于采用的LT触发,在接收的buf设置的很小的情况下,缓冲区数据没有一次性读完,所以可以看到该sock上的读事件触发了多次。

如果改为ET模式,这种情况就不会发生,只会触发一次。

只需要AddEvent中给sock额外添加EPOLLET选项。

不会把完整的数据拿上来,只通知一次,然而一次拿不完这么多字节。所以会存在数据丢失。


epoll的优点

1、支持一个进程打开大数目的socket描述符

这个是与select所对比。

2、IO效率不随FD数目增加而线性下降

这里测试结果参考Linux-Unix系统编程手册

 本质上是之前讲的两个原因

1、epoll只对活跃的文件描述符进行操作

2、避免频繁的用户空间与内核空间的拷贝

参考下面两篇的解释

 

以上是关于I/O多路复用之epoll的主要内容,如果未能解决你的问题,请参考以下文章

I/O多路复用之epoll

I/O多路复用之epoll

I/O多路复用之 epoll

I/O多路复用之epoll

I/O多路复用之epoll

高级I/O---多路复用---epoll