高效IO——多路转接epoll

Posted 两片空白

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了高效IO——多路转接epoll相关的知识,希望对你有一定的参考价值。

目录

前言

一.epoll的相关调用

        1.1 epoll_create

        1.2 epoll_ctl

        1.3 epoll_wait

二.epoll工作原理

四.epoll优点

五.epoll的工作模式

         5.1 水平触发(Level Triggered)工作模式——LT

        5.2 边缘触发(Edge Triggered)工作模式——ET

        5.3 ET和LT对比

六.epoll使用


前言

        epoll是针对select和poll的缺点,再进行了进一步的改进。但是说明一点,epoll和select,poll的功能一样。可以同时等待多个文件。

        epoll最大的改变就是,不再需要用户来对要监视的文件描述符进行管理,而是交给系统来进行管理。而pol,select都需要定义一个数组来对要监视的文件描述符进行管理。

        多路转接适用于长连接的情况。

        短连接使用多路转接,短连接只是通信一次就关闭连接。导致使用多路转接还需要频繁的增加删除监视的文件,效率反而不高。

一.epoll的相关调用

select和poll都是使用一个系统调用接口,来实现要监视的文件的输入和输出。

  • 输入:用户通知内核要监视哪些文件的事件。
  • 输出:内核通知用户,那些文件描述符的事件就绪。

而epoll将输入/输出的功能分开了,是用了两个接口。epoll_ctr和epoll_wait。

        1.1 epoll_create

#include <sys/epoll.h>

int epoll_create(int size);

作用:在内核创建一个epoll模型。epoll模型后面有介绍。

参数:size现在已经忽略,可以随便设置。

返回值:返回一个文件描述符,该文件描述的指向的是epoll模型。通过该文件描述符来对epoll模型进行管理。

注意:使用完epoll模型,需要调用close进行关闭。

        1.2 epoll_ctl

#include <sys/epoll.h>

int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);

作用:用户通知内核,需要在epoll模型中添加/删除/修改文件的事件。

参数:

参数含义
epfdepoll_create返回值
op进行什么操作,用三个宏表示
fd需要监听的文件的文件描述符
event用户告诉内核需要监听什么事

返回值:成功返回0,失败返回-1。

第二个参数op的三个宏:

  • EPOLL_CTL_ADD:注册一个新的文件到epoll模型中
  • EPOLL_CTL_MOD:修改一个已经注册的文件描述符的监听事件
  • EPOLL_CTL_DEL:从epoll模型中删除一个监听的文件。

第四个参数,struct epoll_event结构:

typedef union epoll_data {
	void        *ptr;
	int          fd;
	uint32_t     u32;
	uint64_t     u64;
} epoll_data_t;

struct epoll_event {
	uint32_t     events;      /* Epoll events */
	epoll_data_t data;        /* User data variable */
};

events可以是以下几个宏的集合:

想对一个文件监听多个事件,可以使用多标志位法。多个宏按位或。

  • EPOLLIN:表示对应文件描述符可读,包括对端SOCKET正常关闭
  • EPOLLOUT:表示对应文件描述符可写。
  • EPOLLPRI:表示对应文件描述符有紧急数据可读。
  • EPOLLERR:表示对应文件描述符发生错误。
  • EPOLLHUP:表述对应文件描述符被挂断。
  • EPOLLET:将epoll设为边缘触发模式,这是相对于水平触发模式说的。
  • EPOLLONESHOT:只监听一次事件,当监听完这次事件后,如果需要继续监听这个socket的话,需要再此报这个socket加入到epoll模型中。

主要使用:EPOLLIN和EPOLLOUT两个。

data:是一个联合体,可以用来用户定义。

        1.3 epoll_wait

#include <sys/epoll.h>

int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout);

作用:等待某个文件的事件就绪。

参数:

参数含义
epfdepoll模型
events输出形参数,内核告诉用户,文件的哪些事件就绪,指针指向struct epoll_events数组,不可以是空指针。
maxeventsevents大小,要小于epoll_create参数size的大小
timeout等待时间,单位毫秒。0:非阻塞等待,-1:阻塞等待,具体某一值:具体等待时间。

返回值:

  • 大于0:已经就绪好的文件描述符的个数
  • 等于0:超时
  • 小于0:函数调用失败

参数struct epoll_event *events指向struct epoll_event数组,数组大小是我们用户设定的。

内核会将就绪文件的事件从数组0号下标开始往后放。因此如果有文件事件就绪,epoll_wait返回值num就代表,有从0开始到num之间个文件事件就绪。

但是我们用户并不知道内核在监听多少文件,有多少文件就绪,当我们数组大小比就绪的文件描述符个数小时,会不会有问题?

        不会有问题,没有处理的文件的事件,下一次还会是就绪的,下一次会处理。

二.epoll工作原理

首先说明:

操作系统时如何知道收到数据了?

        一般硬件可以产生中断,操作系统收到中断,根据对应的中断号,执行对应的操作。

例如:网卡收到数据,会形成中断,OS收到中断,会执行对应中断号的操作。

select和poll是如何知道监听的文件的事件就绪?

        操作系统通过轮询检测所有监听的文件。就可以知道哪些文件的事件就绪了。

epoll是如何监听到文件事件的就绪?

        epoll模型中有一种回调机制,这个回调机制是操作系统运用驱动层的功能,使得当某个文件的事件就绪时会通知操作系统,OS系统就会去执行回调函数。

epoll模型里包含什么?

        epoll模型了包含一个红黑树,一个就绪队列和回调机制。

  • 红黑树:保存了用户通知内核,需要监视文件的哪些事件。红黑树的节点里面保存了文件的文件描述和具体事件。红黑树保存的时键值对,键值key用文件描述符合适,不会重复。
  • 就绪队列:里面保存的是就绪的事件。每一个节点代表就绪的事件。
  • 回调机制:当某个文件事件就绪,操作系统通过回调机制,调用对应文件描述符的回调函数,形成一个就绪队列的节点,并连接到就绪队列中。

 调用epoll_create的工作:

  1. 创建红黑树
  2. 创建回调机制
  3. 创建就绪队列
  • 在内核中epoll模型的结构体为eventpoll,每调用一次epoll_create,就创建一一个epoll模型,也就在内核创建了一个eventpoll结构体。该结构体包含两个重要成员。
struct eventpoll{
    ....
    /*红黑树的根节点,这颗树中存储着所有添加到epoll中的需要监控的事件*/
    struct rb_root rbr;
    /*双链表中则存放着将要通过epoll_wait返回给用户的满足条件的事件*/
    struct list_head rdlist;
    ....
};

调用epoll_ctl工作:

  1. 操作红黑树。删除/修改/增加红黑树的节点。红黑树的节点,代表监视的事件。
  2. 形成对应文件描述符的回调函数。当事件就绪,调用回调函数,形成就绪队列节点,连接到就绪队列中。

调用epoll_wait工作:

  1. 首先检测就绪队列是否空。
  2. 不为空,将就绪队列里的就绪事件信息拷贝到用户缓冲区,并且从用户缓冲区数组0号下标开始往后放。

select和poll是通过轮询检测所有要监视的文件,检测是否有文件就绪。事件复杂度为O(n)。并且监视文件越多,效率越低。

epoll检测是否有文件就绪,时间复杂度为O(1)。只需要检测就绪队列是否不为空。

对于海量需要监视的文件,监视文件多了,就绪的概率也就高了。select,poll的效率会明显降低,需要频繁轮询检测监视的文件,并且每次调用select/poll需要将大量的文件描述符,从用户拷贝到内核。但是对于epoll,只需要检测就绪队列即可,并数据轻量级拷贝,不需要将需要监视的文件从用户重新拷贝到内核。

四.epoll优点

  • 接口使用方便:虽然拆分成了3个接口,不需要每次循环重新设置关注的文件描述符,也做到输入输出参数的分离。
  • 数据拷贝轻量化:不需要每次调用epoll都要从小将监视的文件重新从用户拷贝到内核,只需要调用epoll_ctl增加.删除.修改即可。select和poll每次调用都要重新将监视的文件重新从用户拷贝到内核。
  • 回调机制:想知道文件是否就绪,不需要轮询检测,使用回调的方式,将就绪事件加入到就绪队列中,想知道是否有文件就绪,只需要检测就绪队列是否不为空。时间复制度为O(1)。并且不会因为文件的增多而效率降低。
  • 监视的文件没有数量限制。

五.epoll的工作模式

epoll有两种工作模式,水平触发(LT)工作模式和边缘触发(ET)工作模式。

实际select和poll只支持LT工作模式,epoll即支持LT,也支持ET工作模式。

举个例子:

你妈妈叫你吃饭。

你妈妈叫你一次,你没有动,你妈妈会继续一直叫你吃饭。对应LT工作模式。

你妈妈叫你一次,你没有动,你妈妈之后不会再叫你了。对应ET工作模式。

         5.1 水平触发(Level Triggered)工作模式——LT

epoll 默认情况下就是LT工作模式。

  • 当epoll检测到socket上事件就绪,可以不立刻处理,或者只处理一部分。
  • 下一次epoll仍然会认为该事件就绪。
  • 直到事件被处理完。
  • 支持阻塞和非阻塞读写。

        5.2 边缘触发(Edge Triggered)工作模式——ET

当我们再添加文件到epoll模型中时,将事件events设置为EPOLLET事件,epoll对此文件为ET工作模式。

  • 当epoll检测到文件事件就绪,必须立刻处理完,因为该文件的事件下一次就不会是就绪了。
  • 也就是说,ET模式下,文件描述符上的事件就绪后,只有一次处理机会。
  •  只支持非阻塞读写。(原因在下面)

工作再ET模式下,有一个问题:

        当每次读数据的个数比发过来的数据个数少,由于该文件事件只就绪一次,导致没有读取的数据,后面不会就绪了,就丢失了。

比如:实际发过来1024字节,每次读215字节,剩下的512字节,由于下次不会就绪,不会读取了。剩下的512字节相当于丢失了。

        所以我们需要一次性读取完发来的整个数据。

怎么知道读完了整个数据?

        循环读取,当读到的数据个数等于我们期望的数据个数,说明后面可能还有数据,继续读;当实际读到的数据个数小于我们期望读取的数据个数时,肯定说明读完了发来的整个数据。

为什么只支持非阻塞读写?

        但是当循环读取,下一次读取的数据为0时,再读数据时,会发生阻塞。

比如:发送2560字节,调用read,每次期望读取512字节,当循环读取5之后,第6次缓冲区里没有数据,说明该事件不就绪了,然而会再次调用read(因为上一次读到了期望的字节数),就会发生阻塞。

        所以为了避免这种情况,ET模式只能用于非阻塞的读写。

如何让设置文件非阻塞?

        调用fcntl接口。

        5.3 ET和LT对比

  • ET效率会比LT效率更高,epoll_wait返回次数少,但是这样也需要程序员一次响应就绪过程把发送的数据全部处理完。
  • ET代码复杂度更高了。
  • 但是如果LT工作模式下,响应就绪过程一次性也能将所有数据读取完,两者效率是差不多的。

六.epoll使用

  • epoll_wait参数struct epoll_event是将就绪队列的信息拷贝到参数中,不需要再进行参数化。
  • 客户端发送数据,一次性可能没有发送往数据,可能要发多次。之前再写select和poll服务器时,都有一个bug,读数据的缓冲区时一个局部变量,不能保存上一次发送来的数据,所以不同的文件需要其对应的缓冲区来来保存数据。
  • 代码中定义了一个Bucket桶,来保存数据,每一个监视的文件都有一个自己的Bucket。用struct epoll_event的data里的ptr指向,这样就可以做到,既可以保存之前那的数据,还可以接受完发送的数据。
  • 代码中bucket是用一个定长数组来接收,在实际应用中我们可以定义成request,返回时返回response。

Sock.hpp

#pragma once 

#include <iostream>
#include <stdlib.h>
#include <unistd.h>

#include <sys/socket.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <poll.h>
#include <sys/epoll.h>

#define BACKLOG 5
using namespace std;

class Sock{
  public:
    static int Socket(){
      int sock = 0;
      sock = socket(AF_INET, SOCK_STREAM, 0);
      if(sock < 0){
        cerr<<"socket error"<<endl;
        exit(1);
      }
      return sock;
    }
    static void Bind(int sock, int port){
      struct sockaddr_in local;
      local.sin_family = AF_INET;
      local.sin_port = htons(port);
      local.sin_addr.s_addr = htons(INADDR_ANY);
      if(bind(sock, (struct sockaddr *)&local, sizeof(local)) < 0){
        cerr << "bind error"<<endl;
        exit(2);
      }
    }

    static void Listen(int sock){
      if(listen(sock,BACKLOG) < 0){
        cerr << "listen error"<<endl;
        exit(3);
      }
    }
    static int Accept(int sock){
      struct sockaddr_in peer;
      socklen_t len = sizeof(peer);
      return accept(sock, (struct sockaddr *)&peer, &len);
    }
    static void SetSockOpt(int sock){
      int opt =1;
      setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
    }
};

        EpollServer.hpp

#pragma once 

#include "Sock.hpp"
#define NUM 20

class Bucket{
  public:
    Bucket(int fd)
    :_fd(fd)
    ,_pos(0)
  {
    _buff[0] = 0;
  }
    char _buff[20];//数据缓冲区
    int _fd;//文件描述符
    size_t _pos;//从pos位置开始保存到缓存区
};

class EpollServer{

  private:
    int _lsock;
    int _port;
    int _efd;

  public:
    EpollServer(int lsock = -1, int port = 8080, int efd = -1)
      :_lsock(lsock)
      ,_port(port)
      ,_efd(efd)
    {}
    //将sock添加到epoll模型的红黑树中
    void AddSock2Epoll(int sock, int event){
      struct epoll_event ev;
      ev.events = event;
      if(sock == _lsock){
		  //连接套接字不需要开辟桶空间
        ev.data.ptr = nullptr;
      }
      else{
        //Bucket保存数据
        ev.data.ptr = new Bucket(sock);

      }
      if(epoll_ctl(_efd, EPOLL_CTL_ADD, sock, &ev) < 0){
        cerr << "Add error , close sock"<<endl;
        close(sock);
      }
    
    }
    void DelSock2Epoll(int sock){
      epoll_ctl(_efd, EPOLL_CTL_DEL, sock, nullptr);

    }
    void InitServer(){
      _lsock = Sock::Socket();
      Sock::SetSockOpt(_lsock);
      Sock::Bind(_lsock, _port);
      Sock::Listen(_lsock);
      //创建epoll模型
      _efd =  epoll_create(256);
      if(_efd < 0){
        cerr << "epoll create error"<<endl;
        exit(5);
      }
      
      AddSock2Epoll(_lsock, EPOLLIN);

    }
    void Handler(struct epoll_event revent[], int n){
      for(int i =0; i < n; i++){
        if(revent[i].events & EPOLLIN){
          //读
          //连接套接字没有创建缓冲区
          if(revent[i].data.ptr == nullptr){
            //有连接
            int sock = Sock::Accept(_lsock);
            if(sock < 0){
              cerr << "accpet error"<<endl;
            }
            else{
              //加入Epoll模型
              cout<<"get a link..."<<endl;
              AddSock2Epoll(sock, EPOLLIN);
            }

          }
          else{
            Bucket *bk = (Bucket *)revent[i].data.ptr;
            //IO就绪
            int s = recv(bk->_fd, bk->_buff + bk->_pos, sizeof(bk->_buff)- bk->_pos, 0);
            if(s > 0){
              bk->_pos += s;
              if(bk->_pos >= sizeof(bk->_buff)){
                bk->_buff[sizeof(bk->_buff) -1] = 0;

              }
              else{
                bk->_buff[bk->_pos] = 0;
              }
              cout << "client# "<<bk->_buff<<endl;
              if(bk->_pos >= sizeof(bk->_buff)){
              //修改事件为写就绪
                revent[i].events = EPOLLOUT;
                epoll_ctl(_efd, EPOLL_CTL_MOD, bk->_fd, &revent[i]);

              }
            }
            else if(s == 0){
              cerr << "client close..."<<endl;
              close(bk->_fd);
              DelSock2Epoll(bk->_fd);
              //销毁bk开辟的空间,防止内存泄漏
              delete bk;
            }
            else{
              cerr << "recv error"<<endl;
            }
          }

        }
        else if(revent[i].events & EPOLLOUT){
          //写
          Bucket *bk = (Bucket *)revent[i].data.ptr;
          bk->_buff[sizeof(bk->_buff) -1] = '\\n';
          send(bk->_fd, bk->_buff, sizeof(bk->_buff), 0);
          //关闭连接
          DelSock2Epoll(bk->_fd);
          close(bk->_fd);
          delete bk;
        }
        else{
          //其它事件

        }
      }
    }
    void Start(){
      struct epoll_event revent[NUM];
      int timeout = -1;
      while(1){
        int n =epoll_wait(_efd, revent, NUM, timeout);
        switch(n){
          case -1:
            cerr << "epoll wait error" <<endl;
            break;
          case 0:
            cerr << "epoll timeout" <<endl;
            break;
          default:
            Handler(revent,n);
            break;

        }

      }
    }
    ~EpollServer(){
      close(_lsock);
      close(_efd);

    }
};

EpollServer.cc 

#include"EpollServer.hpp"

void Notice(){
  cout<<"Notice:\\n\\t"<<"please port"<<endl;
}

int main(int argc, char *argv[]){
  if(argc != 2){
    Notice();
    exit(4);
  }
  EpollServer *es = new EpollServer(atoi(argv[1]));
  es->InitServer();
  es->Start();
  delete es;

  return 0;
}

以上是关于高效IO——多路转接epoll的主要内容,如果未能解决你的问题,请参考以下文章

五种高阶IO模型以及多路转接技术(selectpoll和epoll)及其代码验证

五种高阶IO模型以及多路转接技术(selectpoll和epoll)及其代码验证

多路转接(IO复用)接口介绍

Linux & IO多路转接——epoll详解

Linux & IO多路转接——epoll详解

IO多路转接 ——— selectpollepoll