五种高阶IO模型以及多路转接技术(selectpoll和epoll)及其代码验证

Posted It‘s so simple

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了五种高阶IO模型以及多路转接技术(selectpoll和epoll)及其代码验证相关的知识,希望对你有一定的参考价值。


1. 五种高阶IO模型

首先我们要明确的是在任何IO操作中,均包含两个步骤,等待和拷贝,而在实际的业务中,等待所消耗的时间往往大于拷贝的时间,因此,让IO操作更高效,核心的方法就是将等待的时间缩短。

低阶IO:是指类似于将用户输入的内容读取到某个变量中,将变量中的值打印在屏幕上等等,简单来说就是对C库自己所维护的缓冲区进行I/O操作。

高阶IO:通常应用于网络Socket编程,对UDP(TCP)所维护的发送缓冲区和接收缓冲区进行I/O操作。并且高阶IO分为同步IO异步IO,同步IO又分为阻塞IO、非阻塞IO、信号驱动IO和多路转接IO

1.1 阻塞IO

阻塞IO:在内核将数据准备好之前(等待+拷贝),系统调用会一直进行等待。并且所有的套接字默认均是阻塞方式。

1.2 非阻塞IO

非阻塞IO:若当前内核没有将数据准备好,则系统调用会直接返回,并返回一个EWOULDBLOCK错误码。
非阻塞IO一般都是搭配循环来使用的(也叫轮询),这对系统资源是较大的浪费,一般都是在特定的场景下使用。

扩展:将一个文件描述符设置为非阻塞属性。

int fcntl(int fd, int cmd, ... /* arg */ ),处于#include <fcntl.h>

  • 首先通过给cmd传入F_GETFL宏来获取到当前文件描述符的属性
  • 其次再通过获得的属性按位或上O_NONBLOCK属性,将文件描述符设置为非阻塞属性

代码如下:

int f1 = fcntl(fd,F_GETFL);
if(f1 < 0)
{
	perror("fcntl");
	return ;
}
fcntl(fd,F_SETFL, f1 | O_NONBLOCK);

1.3 信号驱动IO

信号驱动IO:当内核将数据准备好之后或者说告诉应用进程何时才可以开始拷贝数据,会给应用进程发送一个SIGIO的信号,通知其进行IO操作。当应用程序接收到该信号之后,证明数据已经准备好了,接下来就会调用系统调用函数对其进行相应的IO操作

1.4 多路转接IO

多路转接IO:在流程上与阻塞IO类似,但是其本质上最核心的地方就是可以一次性等待多个文件描述符的就绪状态

1.5 异步IO

异步IO:当内核将数据拷贝好之后,通知应用进程进行相关的IO操作。需要注意的是他和信号驱动IO很像,但是信号驱动IO是内核将数据准备好后才通知,而异步IO是内核将数据拷贝好后才通知

注:为了性能和效率的优先,C++默认采用的是异步IO的方式。

2. 多路转接技术(select、poll、epoll)

首先我们要知道多路复用函数的作用是什么,其本质上就是让内核帮助程序员监控多个文件描述符的IO事件,一旦监控的某个文件描述符对应的事件产生(IO就绪),就会通知调用者,也就是说可以并行的处理多条客户端的请求,换句话说就是实现了高并发。

2.1 select函数

作用:监控多个文件描述符,就绪之后,通知调用者。

2.1.1 select 函数详解

  • nfds:select监控事件集合(fd_set)的范围,范围是从[0,1023]之间去进行选择范围。nfds的取值为:监控的最大文件描述符数值+1
  • fd_set:事件集合类型

readfds:可读事件结合
writefds:可写事件集合
exceptfds:异常事件集合



并且内核在使用该数组的时候采用的是位图的方式,因此总共有1024个比特位

  • timeout:

NULL:阻塞。
0 :非阻塞。
传递一个struct timeval:代表着带有超时时间的方式。

返回值:

  • 监控成功:返回就绪的文件描述符个数

监控成功会返回就绪的文件描述符个数,并且在返回的时候,会将事件集合中未就绪的文件描述符去除掉

  • 监控失败:返回-1

参数含义补充:

  • 如果关心某个文件描述符对应的某种事件,则将文件描述符添加到对应的事件集合当中去。例如:关心3号文件描述符的可读事件,则将3号文件描述符添加到对应的readfds事件中。
  • 添加文件描述符到事件集合的时候,就是将文件描述符对应的比特位设置为1。
  • 如果一个文件描述符关心多种事件(可读、可写、异常),则将文件描述符添加到不同的事件集合当中去。
  • select的监控效率会随着文件描述符的增多而下降,本质原因就是由于监控轮询的范围变大了。
  • 对相应事件集合的操作:

2.1.2 select函数优缺点

优点:

  • According to POSIX.1-2001,select遵循的是POISX标准,说明select函数是一个跨平台的函数,既可以在linux中运行,也可以在win中运行。
  • select在带有超时时间监控的时候,超时时间单位可以是微秒。

缺点:

  • 监控文件描述符的个数最多为1024个
  • 随着监控文件描述符的增多,监控的效率在逐渐的下降(本质上是select在轮询的进行监控)
  • 可读、可写、异常这些事件需要添加到单独的添加到不同的事件集合中
  • 当select监控成功之后,会从事件集合当中去除未就绪的文件描述符,这表明程序在下一次运行的时候,需要重新添加文件描述符。
  • 在每次select进行监控的时候,都会将准备好的事件集合拷贝到内核空间,select返回的时候,都会相应的事件集合从内核空间再拷贝到用户空间。

2.2 poll函数

前提:均是监控多个文件描述符,就绪之后,然后通知调用者。与select相比,不支持跨平台,与epoll相比,效率没有epoll高。

2.2.1 poll函数详解

  • struct pollfd*: 事件集合结构


想让poll监控几个元素,只需要在定义事件结构数据的时候,多传递几个元素。

  • nfds:事件结构数组中的有效元素的个数
  • timeout

>0:带有超时事件,单位秒
==:非阻塞
<0:阻塞

返回值:返回就绪的文件描述符个数

2.2.2 poll函数优缺点

优点:

  • 提出了事件结构的方式,在给poll函数传递参数的时候,不需要分别添加到事件集合当中。
  • 事件结构数组的大小可以根据程序员自己进行定义,并没有上限的要求。
  • 不用再监控到就绪之后,重新添加文件描述符。

缺点:

  • 不支持跨平台。
  • 内核对事件结构数组进行监控的时候也采用的是轮询遍布的方式。

2.3 epoll函数

epoll函数是目前世界上公认在Linux下,多路转接监控效率最高的模型

2.3.1 epoll相关函数详解

① 创建epoll操作句柄

  • size:自从Liunx2.6.8之后,size参数是被忽略的,但是不要传递一个小于0的数字。

返回值:返回epoll的操作句柄。

② 注册待要监控的文件描述符

  • epfd:epoll操作句柄
  • op

  • fd:待处理(添加、修改、删除)的文件描述符
  • event:文件描述符对应的事件结构

返回值:

③ epoll的等待接口

  • epfd:epoll的操作句柄
  • events:类型同 epoll_ctl 中一样,只不过这里的是事件集合数组,从epoll当中获取就绪的事件结构
  • maxevents:最多一次获取多少个事件结构
  • timeout

>0:带有超时事件,单位秒
==:非阻塞
<0:阻塞

返回值:就绪的文件描述符个数

2.3.2 epoll工作原理

当某一个进程调用epoll_create函数时,LInux内核会创建一个eventpoll的结构体,这个结构体中有两个成员与epoll的使用方式密切相关。

简单概述一下:

每当调用epoll_create函数的时候,内核就会创建一个eventpoll的结构体,在该结构体中有两个成员,类型分别为红黑树和双向链表,当调用epoll_ctl函数进行添加、修改或删除时,本质上就是对该红黑树进行添加、修改和删除,而每一个添加进来的红黑树结点均会和设备(网卡)驱动程序建立一个回调关系(这个回调函数为ep_poll_callback),当某个文件描述符就绪之后,他会调用这个回调函数将该就绪的事件结构添加到双向链表当中,而当调用epoll_wait进行监控的时候,如果双向链表为空,则表明当前没有就绪的事件发生,如果不为空,则将双向链表中的内容复制到用户态,并返回将事件数量返回给用户

注意:这里的双向链表其实实现的是一个队列,虽然是一个双向链表,但是他只支持先进先出(FIFO),是队列的特性。

2.3.3 epoll的优点

  • 没有数量的限制,文件描述符的数量为内核所支持的最大上限(65536)
  • 事件回调机制,当文件描述符就绪之后,会调用回调函数将事件结构复制到双向链表中,调用epoll_wait返回时,直接访问就绪队列就可以知道有多少文件描述符就绪,这个操作的时间复杂度为O(1),即使文件操作符很多,也不会受到影响。
  • 数据拷贝是轻量的,只有在合适的时候调用EPOLL_CTL_ADD将对应的文件描述符接口拷贝到内核中。这个操作并不频繁(select/poll每次都要循环的进行拷贝)。

select、poll、epoll对比

2.3.4 LT模式和ET模式

举个例子:当你在中午饭点玩游戏的时候,如果这个时候饭刚好做好了。
LT:家里人第一次通知的时候,你没有管,那他们还会通知第二次、第三次…
ET:家里人在第一次通知的时候,你没有管,那么他们就不会在通知你了。

① LT(Level Triggered) 水平触发工作模式

在LT模式下,当epoll检测到事件就绪的时候,可以不处理或处理一部分,但是可以连续多次调用epoll_wait对事件进行处理,简单点来说的话就是如果事件来了,不管来了几个,只要仍然有未处理的事件,epoll都会通知你

② ET(Edge Triggered) 边缘触发工作模式

在ET模式下,当epoll检测到事件就绪的时候,会立即进行处理,并且只会处理一次,换句话说就是文件描述符上的事件就绪之后,只有一次处理机会。 简单来说就是如果事件来了,不管来了几个,你若不处理或者没有处理完,除非下一个事件到来,否则epoll将不会再通知你。

LT模式存在的问题:

如果可读或者可写事件未进行处理,会频繁反复的激活未处理事件

解决:

在不想处理某个事件的时候就将它从epoll中移除,需要时再添加上

ET模式存在的问题:

如果可读或者可写事件没有全部处理,会有老数据残留,需要等待新数据的到来才会被处理

解决:

  • 循环读取或者写入数据,直至返回值未EAGAIN或者EWOULDBLOCK(循环调用)
  • 读取或写入数据后,通过epoll_ctl设置EPOLL_CTL_MOD,激活未处理事件(相当于将当前未处理事件设置未新事件)

3. select、poll、epoll的代码验证

3.1 select代码验证

3.1.1 select 简单验证

先举个简单的例子,利用select函数对系统的0号文件描述符(读缓冲区)进行监控,一旦监控到读的事件,则将其读入的内容打印到屏幕上。

#include <unistd.h>
#include <sys/select.h>
#include <iostream>

using namespace std;

#define nfds 1

int main()
{
    fd_set readfds;
    FD_ZERO(&readfds);
    FD_SET(0,&readfds);
    while(1)
    {
        fd_set tmp = readfds;
        int ret = select(nfds,&tmp,NULL,NULL,NULL);
        if(ret < 0)
        {
            cout << "select failed" << endl;
            return 0;
        }
        for(int i = 0; i < nfds; ++i){
            if(FD_ISSET(0,&tmp))
            {
                char buf[1024] = {0};
                read(0,buf,sizeof(buf)-1);
                cout << buf << endl;
            }
        }
    }
    return 0;
}

结果验证

3.1.2 使用select函数解决TCP单进程阻塞问题

在TCP单进程的条件下,存在于将accept函数放到while循环内部或外面所造成的问题

内部:每次接收都是新的连接,和每个客户端只能聊一次(accept函数阻塞)
外部:只能接收一个客户端的连接(recv函数阻塞)

更为详细的请看:Linux:TCP Socket编程(代码实战)

解决代码如下:
为了更方便使用,我们将selecct函数的相关操作封装为一个类

#include <sys/select.h>
#include <stdio.h>
#include <iostream>
#include <vector>

using namespace std;

class SocketSelect{
    public:
        SocketSelect()
        {
            FD_ZERO(&readfds_);
            nfds_ = 0;
        }

        void SetFd(int fd)
        {
            FD_SET(fd,&readfds_);

            if(fd > nfds_)
                nfds_ = fd;
        }

        void RmFd(int fd)
        {
            FD_CLR(fd,&readfds_);

            for(int i = nfds_; i >= 0; ++i)
            {
                if(FD_ISSET(i,&readfds_))
                {
                    nfds_ = i;
                    break;
                }
            }
        }

        int SelectWait(vector<int>& iv)
        {
            fd_set tmp = readfds_;
            int ret = select(nfds_+1,&tmp,NULL,NULL,NULL);
            if(ret < 0)
            {
                printf("select failed\\n");
                return -1;
            }

            for(int i = 0; i <= nfds_; ++i)
            {
                if(FD_ISSET(i,&tmp))
                    iv.push_back(i);
            }

            return ret;
        }

        ~SocketSelect()
        {
            FD_ZERO(&readfds_);
        }
    private:
        int nfds_;
        fd_set readfds_;
};

服务端代码:

#include <unistd.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>

#include "ssocket.hpp"

using namespace std;

int main()
{
    int sockfd = socket(AF_INET,SOCK_STREAM,IPPROTO_TCP);
    if(sockfd < 0)
    {
        cout << "socket failed" << endl;
        return 0;
    }
    
    sockaddr_in addr;
    addr.sin_port = htons(18989);
    addr.sin_family = AF_INET;
    addr.sin_addr.s_addr = inet_addr("0.0.0.0");
    int ret = bind(sockfd,(struct sockaddr *)&addr,sizeof(addr));
    if(ret < 0)
    {
        close(sockfd);
        cout << "bind failed" << endl;
    }

    ret = listen(sockfd,5);
    if(ret < 0)
    {
        close(sockfd);
        cout << "listen failed" << endl;
    }

    SocketSelect ss;
    ss.SetFd(sockfd);

    while(1)
    {
        vector<int> iv;
        ret = ss.SelectWait(iv);
        if(ret < 0)
        {
            continue;
        }

        for(size_t i = 0; i < iv.size(); ++i)
        {
            if(iv[i] == sockfd)
            {
                //new con
                int newsockfd = accept(iv[i],NULL,NULL);
                if(newsockfd < 0)
                {
                    cout << "accept failed" << endl;
                    continue;
                }

                ss.SetFd(newsockfd);
            }
            else
            {
                //recv
                char buf[1024] = {0};
                ssize_t recv_size = recv(iv[i],buf,sizeof(buf)-1,0);
                if(recv_size < 0)
                {
                    cout << "recv failed" << endl;
                    continue;
                }
                else if(recv_size == 0)
                {
                    cout << "peer shutdown" << endl;
                    ss.RmFd(iv[i]);
                    close(iv[i]);
                    continue;
                }
                else
                {
                    cout << buf << endl;
                }
            }
        }
    }
    
    close(sockfd);
    return 0;
}

客户端代码:

#include <unistd.h>
#include <sys/socket.h>
#include <arpa/inet.h>
#include <netinet/in.h>
#include <string.h>

#include <iostream>

using namespace std;

int main()
{
    int sockfd = socket(AF_INET,SOCK_STREAM,IPPROTO_TCP);
    if(sockfd < 0)
    {
        cout << "socket failed" << endl;
        return 0;
    }

    sockaddr_in addr;
    addr.sin_family = AF_INET;
    addr.sin_port = htons(18989);
    addr.sin_addr.s_addr = inet_addr("118.89.67.215");
    int ret = connect(sockfd,(struct sockaddr *)&addr,sizeof(addr));
    if(ret < 0)
    {
        cout << "connect failed" << endl; 
    }

    while(1)
    {
     	//char buf[1024] = "i am client1\\n";
        char buf[1024] = "i am clientxxxxx\\n";
        ssize_t send_size = send(sockfd,buf,strlen(buf),0);
        if(send_size < 0)
        {
            cout << "send failed" << endl;
            continue;
        }
        sleep(1);
    }
    return 0;
}

运行结果:


完美解决了单进程下两个阻塞的问题。

3.1.3 在多线程下调用select函数

在多线程下调用select函数(是多个线程共同侦听同一个可读事件集合)

#include <unistd.h>
#include <sys/select.h>
#include <pthread.h>

#include <iostream>

using namespace std;

void* PthreadEntry(void* arg)
{
    pthread_detach(pthread_self());
    fd_set *readfds = (fd_set*) arg;
    
    while(1)
    {
        int ret = select(1,readfds,NULL,NULL,NULL);
        if(ret < 0)
        {
           cout << "select failed" << endl;
           continue;
        }

        if(FD_ISSET(0,readfds))
        {
            char buf[1024] = {0};
            read(0,buf,sizeof(buf)-1);
            cout << buf << endl;
        }
    }

    return NULL;
}

int main()
{
    pthread_t pid;
    
    fd_set readfds;
    FD_ZERO(&readfds);
    FD_SET(0,&readfds)<

以上是关于五种高阶IO模型以及多路转接技术(selectpoll和epoll)及其代码验证的主要内容,如果未能解决你的问题,请参考以下文章

五种高阶IO模型以及多路转接技术(selectpoll和epoll)及其代码验证

五种IO模型与多路转接

五种IO模型与多路转接

五种IO模型与多路转接

高级IO模型

高级IO模型