多路转接(IO复用)接口介绍

Posted 捕获一只小肚皮

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了多路转接(IO复用)接口介绍相关的知识,希望对你有一定的参考价值。

文章目录

引言

此文仅仅作为多路转接api的介绍和使用,而对于五种IO模型的介绍可以移步概念,在概念中详细介绍了阻塞和非阻塞区别,多路转接概念和好处,异步IO等基本介绍.

select

函数声明

int select(int nfds, fd_set *readfds, fd_set *writefds,  fd_set *exceptfds, struct timeval *timeout);

参数以及类型介绍

参数nfds,整型,意义表示select所需要轮询的fd中的最大值加一.例如select需要轮询的fd有3,5,4,6,9,7,6,那么nfds = 9 + 1 = 10;至于为什么加一,理由很简单,其实就是循环的一个遍历界限,比如我要遍历5次,那么循环就是for(int i = 0;i<5;i++); 这里的fd集合中最大值为9,所以至少需要遍历0-9等10个位置,也就是遍历10次,这就是为什么加一


类型fd_set,是一种位图结构,用于表示某个值有与无,可以节省内存.假设位图能存储10个数据,现在该结构里面有4个数据:3,5,8,1.那么其结构图为:

而参数readfds,writefds,exceptfds则是输入输出型参数,它们分别表示所有需轮询读事假的fd,写事件的fd,异常事件的fd,而输入输出分别表示为:

  • 输入: 用户告诉内核,你帮我监听x,x,x,x等fd集合
  • 输出:内核告诉用户,你所关心的fd集合某部分已经就绪

也就是说,针对读(写,异常)事件来说,用户和内核使用的是同一个变量,以10位读事件位图结构为例,假设用户告诉内核需要监听1,2,3,5,6,7,8等fd,那么该结构中内容为:

而当输出时,内核告诉用户,现在你所关心的fd集合中,有3,7,8就绪了,请进行读取,那么该结构中内容就为:

我们能够清晰的发现原来所存储的数据1,2,5,6消失了,这将会造成一个非常严重的事情,下一次轮询,我们丢失了对该fd集合部分数据的健监控,因此,我们需要一个第三方数组,在每次轮询前进行记录所有监控的fd集合,以待下次轮询使用

而对于该位图结构,有专门的是个操作分别是:

void FD_CLR(int fd, fd_set *set);	// 用来清除描述词组set中相关fd 的位
int  FD_ISSET(int fd, fd_set *set);	// 用来测试描述词组set中相关fd 的位是否为真
void FD_SET(int fd, fd_set *set);  // 用来设置描述词组set中相关fd的位
void FD_ZERO(fd_set *set);       // 用来清除描述词组set的全部位为0

类型struct timeval其定义为:

struct timeval
  	__time_t tv_sec;     //seconds
 	__suseconds_t tv_usec; //microseconds
;


当参数timeout值为nullptr时候,执行1逻辑(阻塞等待)

当参数timeout值为0,0时候,执行2逻辑(非阻塞等待)

当参数timeout值为n,0时候,如果在小于n秒时候内有就绪事件,表示阻塞等待,等于n秒时候,表示在此时间内无事件就绪,大于n秒时候,表示非阻塞等待;


返回值

当返回值等于-1时候,表示有错误发生,错误原因存储与errno,此时参数readfds,writefds, exceptfds和timeout的值变成不可预测

当返回值等于0时候,表示在文件描述符状态改变前已超过timeout时间,没有返回事件

当返回值大于0时候,表示文件描述符状态改变的个数.


select代码演示

这里为了让大家熟悉select的使用,我们仅仅针对事件写了一个简单的select_server,并且只写框架伪码,想看代码请移步代码

//1. 全局定义第三方数组
fd_num[NUM];

int main()
    /* 2.网络套接字准备:socket,bind 和 listen,但是没有accept,因为它是阻塞等待,我们需要把此事交给select去做
    socket,bind,listen
    
    /* 3.定义读取事假位图,把监听套接字放进去,初始化全局数组*/
    fd_set rfds; fd_num[0]= listen_sock;  memset(fd_num,-1,sizeof(fd_num));  //-1代表此位置空,未占用
	
    /* 4.开始轮询 */ 
    for
        /*  5.每次轮询前初始化读事件位图 */  FD_ZERO(&rfds);
        
        /* 6. 把第三方数组存储的所有待监听fd放进 位图结构rfds,同时记录出最大fd,以待获取nfds值*/
        for(i<NUM)
            FD_SET(fd_num[i],&rfds);   nfds;
        
        /* 7. 定义deadline ,开始执行select ,通过返回值确定怎么处理*/
        int n = select(nfds,rfds,nullptr,nullptr,&timeout);
        if(n == -1) // error
        else if(n == 0)  //说明deadline内时间,无事件返回
        else
            /* 8. 对返回的rfds的所有位进行合法检测 */ 
            for()
                
                /*  9. 判断是否有监听套接字,如果有进行甄别,其余按照普通套接字fd处理 */ 
                if(==listen)
                    /* 如果是监听套接字,则获取新fd,然后插入第三方数组中的空位 */ 
                
                else
             		/* 如果是普通套接字则读取*/        
                
            
                
    
    return 0;

该代码可以通过改timeval值的方式体验阻塞,非阻塞轮询

优缺点

  • 优点:

    可以一次性等待多个fd,在一定程度上提升了IO效率

  • 缺点:

    位图结构fd_set容量受限,当fd数量足够大时候,将会造成满载

    底层需要轮询式的检查,当fd数量够多,会造成一定开销

    当fd数量足够多时候,可能会造成select在用户态和内核态之间频繁切换

    每次取得读事件后,又要重新循环设置位图结构的数据,比较繁琐

poll

函数声明

int poll(struct pollfd *fds, nfds_t nfds, int timeout);

参数以及类型介绍

类型struct pollfd,里面分别封装了一个文件描述符所对应的输入输出事件

struct pollfd
    int	fd;  		/* file descriptor */
    short events;  	/* requested events */
    short revents;	/* returned events */
;

events和revents本质来说也是一个位图结构,可以利用系统所定义的宏 或上 自己来进行表述监听读事件,还是写等事件.

系统定义的宏:

该结构体这样处理,有一个非常大的好处,那就是对用户告诉内核和内核告诉用户的切换进行解耦,因为通知这件事可以不用在同一个变量上进行.

参数fds是一个结构体数组指针,用于指向所需要监听的一批fd事件结构体struct pollfd;

参数nfds表示数组的长度,即有多少个元素

参数timeout表示时间,单位以毫秒为级别,时间意义和select一样

返回值

  • 返回值小于0, 表示出错;

  • 返回值等于0, 表示poll函数等待超时(即在timeout时间内未有事件就绪);

  • 返回值大于0, 表示poll由于监听的文件描述符就绪而返回.

poll代码演示

在知晓了poll的优越性后,我们对select的演示代码进行了优化(用poll替换),由于代码较长,可以移步代码查阅

优缺点

  • 优点

    不同与select使用三个位图来表示三个fdset的方式,poll使用一个pollfd的指针实现,

    pollfd结构包含了要监视的event和发生的event,不再使用select“参数-值”传递的方式. 接口使用比select更方便.

    poll并没有最大数量限制 (但是数量过大后性能也是会下降).

  • 缺点

    poll中监听的文件描述符数目增多时:

    和select函数一样,poll返回后,需要轮询pollfd来获取就绪的描述符.

    每次调用poll都需要把大量的pollfd结构从用户态拷贝到内核中.

    同时连接的大量客户端在一时刻可能只有很少的处于就绪状态, 因此随着监视的描述符数量的增长, 其效 率也会线性下降.

epoll

epoll是在poll的基础上进行更加改进了的接口,效率比较高.

epoll三调用

int epoll_create(int size);    //创建epoll模型
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);  //添加fd到epoll模型
int epoll_wait(int epfd, struct epoll_event * events, int maxevents, int timeout);    //通知就绪fd.

epoll_create函数用于创建epoll模型,方便后续使用;size代表在该模型中,内核返回批量就绪事件时,最多返回的数量

epoll_ctl函数用于对该模型进行添加,修改,删除等fd事件,第二个参数值一般有:

  • EPOLL_CTL_ADD : 注 册 新 的 fd 到 epfd 中 ;
  • EPOLL_CTL_MOD :修改已经注册的fd的监听事件;
  • EPOLL_CTL_DEL :从epfd中删除一个fd;

epoll_ctl的第三个参数代表是需要被监听的fd;

结构体epoll_event内容如下:

struct epoll_struct
    uint32_t events;    /*  epoll events */ 
    epoll_data_t data;  /*  user data variable */ 
;

typedef union epoll_data
    void* ptr;
    int fd;
    uint32_t u32;
    uint63_t u64;
epoll_data_t;

而其成员events可以是以下几个宏的成员:

  • EPOLLIN : 表示对应的文件描述符可以读 (包括对端SOCKET正常关闭);

  • EPOLLOUT : 表示对应的文件描述符可以写;

  • EPOLLPRI : 表示对应的文件描述符有紧急的数据可读 (这里应该表示有带外数据到来);

  • EPOLLERR : 表示对应的文件描述符发生错误;

  • EPOLLHUP : 表示对应的文件描述符被挂断;

  • EPOLLET : 将EPOLL设为边缘触发(Edge Triggered)模式, 这是相对于水平触发(Level Triggered)来说的.

  • EPOLLONESHOT:只监听一次事件, 当监听完这次事件之后, 如果还需要继续监听这个socket的话, 需要 再次把这个socket加入到EPOLL队列里.

epoll_wait函数用于通知事件准备就绪,第二个参数是一个数组指针,第三个参数代表数组元素数量,表示最多可以批量返回maxevents个事件,第四个参数用法和poll一样;其返回值表示**实际返回的事件数目,**如果小于0代表失败,0地表超时

简述epoll实现原理

epoll_create用于创建epoll模型,但是这个模型到底什么样呢?请看下图:

在这个模型里面有红黑树,回调机制,以及就绪队列;

红黑树的结点表示被监听fd的一个封装,当有海量连接时候,此结构可以快速查找,修改,删除;

回调机制是每个fd以及对应连接的某种操作,作用是把相关数据已经处理好的事件放进就绪队列;

就绪队列存储的是已经准备好的就绪事件;

简历好这个模型以后,就可以进行数据的修改监听了,这便是epoll_ctl 的作用(这里仅仅以读事件为例):

  • 新建一个红黑树结点对象并对fd封装;
  • 把该节点插入到红黑树里面;
  • 建立起新插入的结点fd和回调机制的联系;

知道了创建的模型以及相关的数据操作后,我们再看下其为什么高效,在讲解之前,先说一下外设数据存入内存的操作:

  • 当外设数据到达后,触发硬件中断信号,通知CPU进行处理
  • CPU收到信号,通过中断信号表执行相关处理程序,拷贝外设数据到内存
  • 处理完毕,CPU重新执行其他进程

当把相关事件数据拷贝到内存(准确说是待监听的等待队列中)后,OS并不知道是哪些事件准备就绪,只知道有数据准备好了,当有大批量链接时候,OS只能进行轮询遍历,这样效率会比较低下;

而epoll模型中的回调机制和就绪队列正是为该原因所准备.当OS拷贝完毕外设数据后,让该数据和回调机制产生关联,然后回调机制通过自身和fd的关联对数据进行检查处理,如果发现是就绪事件,直接将它扔到就绪队列.未就绪的不管(实际是扔回等待队列中);

因此epoll_wait返回就绪事件时,便可以直接从就绪队列中拿取,速度极快;

epoll代码演示

#include <iostream>
#include <string>
#include "Sock.hpp"
#include <sys/epoll.h>
#include <sys/select.h>
#include <unistd.h>
#include <cstdlib>

#define SIZE 128
#define NUM 64
/* user manual page */
void Usage(std::string proc)

    std::cout << "Usage : " << proc << " port " << std::endl;


int main(int argc, char *argv[])

    if (argc != 2)
    
        Usage(argv[1]);
        exit(1);
    

    struct timeval tl;

    /* Converts the string to an integer */
    uint16_t port = (uint16_t)atoi(argv[1]);

    /* Netwoek readiness*/
    int listen_sock = Sock::SOCKET();
    Sock::BIND(listen_sock, port);
    Sock::LISTEN(listen_sock);

    /* Accept call shouldn't be here,we need to delegate the wait operation to `epoll`*/

    /* create epoll mode, get file descriptor*/
    int epfd = epoll_create(SIZE);

    /* Converts the user mode to the kernel mode */
    struct epoll_event ctl_evt;
    ctl_evt.events = EPOLLIN;
    ctl_evt.data.fd = listen_sock;
    epoll_ctl(epfd, EPOLL_CTL_ADD, listen_sock, &ctl_evt);

    /*start cycle of events*/
    volatile bool quit = false;

    struct epoll_event wait_evt[NUM];
    while (!quit)
    
        int timeout = 1000;
        int n = epoll_wait(epfd, wait_evt, NUM, timeout);
        switch (n)
        
        case 0:
            std::cout << "time out. . . " << std::endl;
            break;
        case -1:
            std::cout << "epoll error. . ." << std::endl;
            break;
        default:
            std::cout << "events are ready. . . " << std::endl;
            for (int i = 0; i < n; i++)
            
                std::cout << "read events are ready..." << std::endl;
                /*  process read events */
                if (wait_evt[i].events & EPOLLIN)
                
                    /* process listen_sock linke */
                    if (wait_evt[i].data.fd == listen_sock)
                    
                         std::cout << "listen_sock:" << listen_sock << ",event  is ready" << std::endl;
                           /* process link events */
                        int fd = Sock::ACCEPT(listen_sock);
                        if (fd >= 0)
                        
                            struct epoll_event ctl_evt;
                            ctl_evt.events = EPOLLIN;
                            ctl_evt.data.fd = fd;
                            epoll_ctl(epfd, EPOLL_CTL_ADD, fd, &ctl_evt);
                            std::cout << "new fd:" << fd << ",is added the epoll mode" << std::endl;
                        

                    
                    else 
                    
                        /* process common read events */
                        std::cout << "common read events are ready" << std::endl;
                        char buffer[1024];
                        size_t s = recv(wait_evt[i].data.fd,buffer,sizeof(buffer)-1,0);
                        if(s > 0)
                            std::cout<<"=========== rechieve message ========= : "<<buffer<<std::endl;
                        
                        else if(s == 0)
                            std::cout<< "a client has closed,sock : "<<wait_evt[i].data.fd <<std::endl;
                            close(wait_evt[i].data.fd);
                            epoll_ctl(epfd,EPOLL_CTL_DEL,wait_evt[i].data.fd,nullptr);  /* remove closed fd from red and black tree */
                            std::cout<<"remove the fd success"<<std::endl;
                        
                        else 
                            std::cout<<"recv error "<<std::endl;
                            close(wait_evt[i].data.fd);
                            epoll_ctl(epfd,EPOLL_CTL_DEL,wait_evt[i].data.fd,nullptr);  /* remove closed fd from red and black tree */
                            std::cout<<"remove the fd success"<<std::endl;

                        
                    
                
                else if (wait_evt[i].events & EPOLLOUT)
                 /* process write events */

                
            
            break;
        
    
    close(epfd);
    return 0;

epoll工作方式

epoll有2种工作方式-水平触发(LT)和边缘触发(ET),介绍他们之前先看一下下面这个例子:

  • 我们已经把一个tcp socket添加到epoll描述符

  • 这个时候socket的另一端被写入了2KB的数据

  • 调用epoll_wait,并且它会返回. 说明它已经准备好读取操作

  • 然后调用read, 只读取了1KB的数据

  • 继续调用epoll_wait…

水平触发 Level Triggered 工作模式

  • epoll默认状态下就是LT工作模式.当epoll检测到socket上事件就绪的时候, 可以不立刻进行处理. 或者只处理一部分.

  • 如上面的例子, 由于只读了1K数据, 缓冲区中还剩1K数据, 在第二次调用 epoll_wait 时, epoll_wait仍然会立刻返回并通知socket读事件就绪.

  • 直到缓冲区上所有的数据都被处理完, epoll_wait 才不会立刻返回. 支持阻塞读写和非阻塞读写

边缘触发 Edge Triggered 工作模式

  • 如果我们在第1步将socket添加到epoll描述符的时候使用EPOLLET标志, epoll便进入ET工作模式.当epoll检测到socket上事件就绪时, 必须立刻处理.

  • 如上面的例子, 虽然只读了1K的数据, 缓冲区还剩1K的数据, 在第二次调用 epoll_wait 的时候,epoll_wait 不会再返回了.

  • 也就是说, ET模式下, 文件描述符上的事件就绪后, 只有一次处理机会.

  • ET的性能比LT性能更高( epoll_wait 返回的次数少了很多). nginx默认采用ET模式使用epoll.只支持非阻塞的读写

  • select和poll其实也是工作在LT模式下. epoll既可以支持LT, 也可以支持ET.

对比LT和ET

LT是 epoll 的默认行为. 使用 ET 能够减少 epoll 触发的次数. 但是代价就是强逼着程序猿一次响应就绪过程中就把 所有的数据都处理完.

相当于一个文件描述符就绪之后, 不会反复被提示就绪, 看起来就比 LT 更高效一些.

但是在 LT 情况下如果也能做到 每次就绪的文件描述符都立刻处理, 不让这个就绪被重复提示的话, 其实性能也是一样的.

另一方面, ET 的代码复杂程度更高了.

注意点

使用epoll的ET工作模式时,一般把文件描述符设置为非阻塞方式.

理由: 如果采用阻塞方式读取文件,容易造成进程一直阻塞,形成类似于线程死锁问题;

简单例子:

假设A给B发送了500字节,而缓冲区大小是100字节,于是B每次向缓冲区拿取100个字节,当取了五次以后,由于B并不知道A到底发送的多少字节,为了内容完整性,B会再向缓冲区申请拿取,但是A已经没有能发送的消息了,于是B的缓冲区一直为空,就造成了B一直等待拿取缓冲区数据,造成阻塞;进而被OS一直挂起;

以上是关于多路转接(IO复用)接口介绍的主要内容,如果未能解决你的问题,请参考以下文章

高效IO——多路转接之poll

[Linux] 典型IO模型与多路转接IO模型

[Linux] 典型IO模型与多路转接IO模型

[Linux] 典型IO模型与多路转接IO模型

binglinuxc(多路转接)

binglinuxc(多路转接)