高效IO——多路转接epoll
Posted 两片空白
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了高效IO——多路转接epoll相关的知识,希望对你有一定的参考价值。
目录
5.1 水平触发(Level Triggered)工作模式——LT
5.2 边缘触发(Edge Triggered)工作模式——ET
前言
epoll是针对select和poll的缺点,再进行了进一步的改进。但是说明一点,epoll和select,poll的功能一样。可以同时等待多个文件。
epoll最大的改变就是,不再需要用户来对要监视的文件描述符进行管理,而是交给系统来进行管理。而pol,select都需要定义一个数组来对要监视的文件描述符进行管理。
多路转接适用于长连接的情况。
短连接使用多路转接,短连接只是通信一次就关闭连接。导致使用多路转接还需要频繁的增加删除监视的文件,效率反而不高。
一.epoll的相关调用
select和poll都是使用一个系统调用接口,来实现要监视的文件的输入和输出。
- 输入:用户通知内核要监视哪些文件的事件。
- 输出:内核通知用户,那些文件描述符的事件就绪。
而epoll将输入/输出的功能分开了,是用了两个接口。epoll_ctr和epoll_wait。
1.1 epoll_create
#include <sys/epoll.h>
int epoll_create(int size);
作用:在内核创建一个epoll模型。epoll模型后面有介绍。
参数:size现在已经忽略,可以随便设置。
返回值:返回一个文件描述符,该文件描述的指向的是epoll模型。通过该文件描述符来对epoll模型进行管理。
注意:使用完epoll模型,需要调用close进行关闭。
1.2 epoll_ctl
#include <sys/epoll.h>
int epoll_ctl(int epfd, int op, int fd, struct epoll_event *event);
作用:用户通知内核,需要在epoll模型中添加/删除/修改文件的事件。
参数:
参数 | 含义 |
epfd | epoll_create返回值 |
op | 进行什么操作,用三个宏表示 |
fd | 需要监听的文件的文件描述符 |
event | 用户告诉内核需要监听什么事 |
返回值:成功返回0,失败返回-1。
第二个参数op的三个宏:
- EPOLL_CTL_ADD:注册一个新的文件到epoll模型中
- EPOLL_CTL_MOD:修改一个已经注册的文件描述符的监听事件
- EPOLL_CTL_DEL:从epoll模型中删除一个监听的文件。
第四个参数,struct epoll_event结构:
typedef union epoll_data {
void *ptr;
int fd;
uint32_t u32;
uint64_t u64;
} epoll_data_t;
struct epoll_event {
uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
};
events可以是以下几个宏的集合:
想对一个文件监听多个事件,可以使用多标志位法。多个宏按位或。
- EPOLLIN:表示对应文件描述符可读,包括对端SOCKET正常关闭
- EPOLLOUT:表示对应文件描述符可写。
- EPOLLPRI:表示对应文件描述符有紧急数据可读。
- EPOLLERR:表示对应文件描述符发生错误。
- EPOLLHUP:表述对应文件描述符被挂断。
- EPOLLET:将epoll设为边缘触发模式,这是相对于水平触发模式说的。
- EPOLLONESHOT:只监听一次事件,当监听完这次事件后,如果需要继续监听这个socket的话,需要再此报这个socket加入到epoll模型中。
主要使用:EPOLLIN和EPOLLOUT两个。
data:是一个联合体,可以用来用户定义。
1.3 epoll_wait
#include <sys/epoll.h>
int epoll_wait(int epfd, struct epoll_event *events,int maxevents, int timeout);
作用:等待某个文件的事件就绪。
参数:
参数 | 含义 |
epfd | epoll模型 |
events | 输出形参数,内核告诉用户,文件的哪些事件就绪,指针指向struct epoll_events数组,不可以是空指针。 |
maxevents | events大小,要小于epoll_create参数size的大小 |
timeout | 等待时间,单位毫秒。0:非阻塞等待,-1:阻塞等待,具体某一值:具体等待时间。 |
返回值:
- 大于0:已经就绪好的文件描述符的个数
- 等于0:超时
- 小于0:函数调用失败
参数struct epoll_event *events指向struct epoll_event数组,数组大小是我们用户设定的。
内核会将就绪文件的事件从数组0号下标开始往后放。因此如果有文件事件就绪,epoll_wait返回值num就代表,有从0开始到num之间个文件事件就绪。
但是我们用户并不知道内核在监听多少文件,有多少文件就绪,当我们数组大小比就绪的文件描述符个数小时,会不会有问题?
不会有问题,没有处理的文件的事件,下一次还会是就绪的,下一次会处理。
二.epoll工作原理
首先说明:
操作系统时如何知道收到数据了?
一般硬件可以产生中断,操作系统收到中断,根据对应的中断号,执行对应的操作。
例如:网卡收到数据,会形成中断,OS收到中断,会执行对应中断号的操作。
select和poll是如何知道监听的文件的事件就绪?
操作系统通过轮询检测所有监听的文件。就可以知道哪些文件的事件就绪了。
epoll是如何监听到文件事件的就绪?
epoll模型中有一种回调机制,这个回调机制是操作系统运用驱动层的功能,使得当某个文件的事件就绪时会通知操作系统,OS系统就会去执行回调函数。
epoll模型里包含什么?
epoll模型了包含一个红黑树,一个就绪队列和回调机制。
- 红黑树:保存了用户通知内核,需要监视文件的哪些事件。红黑树的节点里面保存了文件的文件描述和具体事件。红黑树保存的时键值对,键值key用文件描述符合适,不会重复。
- 就绪队列:里面保存的是就绪的事件。每一个节点代表就绪的事件。
- 回调机制:当某个文件事件就绪,操作系统通过回调机制,调用对应文件描述符的回调函数,形成一个就绪队列的节点,并连接到就绪队列中。
调用epoll_create的工作:
- 创建红黑树
- 创建回调机制
- 创建就绪队列
- 在内核中epoll模型的结构体为eventpoll,每调用一次epoll_create,就创建一一个epoll模型,也就在内核创建了一个eventpoll结构体。该结构体包含两个重要成员。
struct eventpoll{
....
/*红黑树的根节点,这颗树中存储着所有添加到epoll中的需要监控的事件*/
struct rb_root rbr;
/*双链表中则存放着将要通过epoll_wait返回给用户的满足条件的事件*/
struct list_head rdlist;
....
};
调用epoll_ctl工作:
- 操作红黑树。删除/修改/增加红黑树的节点。红黑树的节点,代表监视的事件。
- 形成对应文件描述符的回调函数。当事件就绪,调用回调函数,形成就绪队列节点,连接到就绪队列中。
调用epoll_wait工作:
- 首先检测就绪队列是否空。
- 不为空,将就绪队列里的就绪事件信息拷贝到用户缓冲区,并且从用户缓冲区数组0号下标开始往后放。
select和poll是通过轮询检测所有要监视的文件,检测是否有文件就绪。事件复杂度为O(n)。并且监视文件越多,效率越低。
epoll检测是否有文件就绪,时间复杂度为O(1)。只需要检测就绪队列是否不为空。
对于海量需要监视的文件,监视文件多了,就绪的概率也就高了。select,poll的效率会明显降低,需要频繁轮询检测监视的文件,并且每次调用select/poll需要将大量的文件描述符,从用户拷贝到内核。但是对于epoll,只需要检测就绪队列即可,并数据轻量级拷贝,不需要将需要监视的文件从用户重新拷贝到内核。
四.epoll优点
- 接口使用方便:虽然拆分成了3个接口,不需要每次循环重新设置关注的文件描述符,也做到输入输出参数的分离。
- 数据拷贝轻量化:不需要每次调用epoll都要从小将监视的文件重新从用户拷贝到内核,只需要调用epoll_ctl增加.删除.修改即可。select和poll每次调用都要重新将监视的文件重新从用户拷贝到内核。
- 回调机制:想知道文件是否就绪,不需要轮询检测,使用回调的方式,将就绪事件加入到就绪队列中,想知道是否有文件就绪,只需要检测就绪队列是否不为空。时间复制度为O(1)。并且不会因为文件的增多而效率降低。
- 监视的文件没有数量限制。
五.epoll的工作模式
epoll有两种工作模式,水平触发(LT)工作模式和边缘触发(ET)工作模式。
实际select和poll只支持LT工作模式,epoll即支持LT,也支持ET工作模式。
举个例子:
你妈妈叫你吃饭。
你妈妈叫你一次,你没有动,你妈妈会继续一直叫你吃饭。对应LT工作模式。
你妈妈叫你一次,你没有动,你妈妈之后不会再叫你了。对应ET工作模式。
5.1 水平触发(Level Triggered)工作模式——LT
epoll 默认情况下就是LT工作模式。
- 当epoll检测到socket上事件就绪,可以不立刻处理,或者只处理一部分。
- 下一次epoll仍然会认为该事件就绪。
- 直到事件被处理完。
- 支持阻塞和非阻塞读写。
5.2 边缘触发(Edge Triggered)工作模式——ET
当我们再添加文件到epoll模型中时,将事件events设置为EPOLLET事件,epoll对此文件为ET工作模式。
- 当epoll检测到文件事件就绪,必须立刻处理完,因为该文件的事件下一次就不会是就绪了。
- 也就是说,ET模式下,文件描述符上的事件就绪后,只有一次处理机会。
- 只支持非阻塞读写。(原因在下面)
工作再ET模式下,有一个问题:
当每次读数据的个数比发过来的数据个数少,由于该文件事件只就绪一次,导致没有读取的数据,后面不会就绪了,就丢失了。
比如:实际发过来1024字节,每次读215字节,剩下的512字节,由于下次不会就绪,不会读取了。剩下的512字节相当于丢失了。
所以我们需要一次性读取完发来的整个数据。
怎么知道读完了整个数据?
循环读取,当读到的数据个数等于我们期望的数据个数,说明后面可能还有数据,继续读;当实际读到的数据个数小于我们期望读取的数据个数时,肯定说明读完了发来的整个数据。
为什么只支持非阻塞读写?
但是当循环读取,下一次读取的数据为0时,再读数据时,会发生阻塞。
比如:发送2560字节,调用read,每次期望读取512字节,当循环读取5之后,第6次缓冲区里没有数据,说明该事件不就绪了,然而会再次调用read(因为上一次读到了期望的字节数),就会发生阻塞。
所以为了避免这种情况,ET模式只能用于非阻塞的读写。
如何让设置文件非阻塞?
调用fcntl接口。
5.3 ET和LT对比
- ET效率会比LT效率更高,epoll_wait返回次数少,但是这样也需要程序员一次响应就绪过程把发送的数据全部处理完。
- ET代码复杂度更高了。
- 但是如果LT工作模式下,响应就绪过程一次性也能将所有数据读取完,两者效率是差不多的。
六.epoll使用
- epoll_wait参数struct epoll_event是将就绪队列的信息拷贝到参数中,不需要再进行参数化。
- 客户端发送数据,一次性可能没有发送往数据,可能要发多次。之前再写select和poll服务器时,都有一个bug,读数据的缓冲区时一个局部变量,不能保存上一次发送来的数据,所以不同的文件需要其对应的缓冲区来来保存数据。
- 代码中定义了一个Bucket桶,来保存数据,每一个监视的文件都有一个自己的Bucket。用struct epoll_event的data里的ptr指向,这样就可以做到,既可以保存之前那的数据,还可以接受完发送的数据。
- 代码中bucket是用一个定长数组来接收,在实际应用中我们可以定义成request,返回时返回response。
Sock.hpp
#pragma once
#include <iostream>
#include <stdlib.h>
#include <unistd.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <netinet/in.h>
#include <arpa/inet.h>
#include <poll.h>
#include <sys/epoll.h>
#define BACKLOG 5
using namespace std;
class Sock{
public:
static int Socket(){
int sock = 0;
sock = socket(AF_INET, SOCK_STREAM, 0);
if(sock < 0){
cerr<<"socket error"<<endl;
exit(1);
}
return sock;
}
static void Bind(int sock, int port){
struct sockaddr_in local;
local.sin_family = AF_INET;
local.sin_port = htons(port);
local.sin_addr.s_addr = htons(INADDR_ANY);
if(bind(sock, (struct sockaddr *)&local, sizeof(local)) < 0){
cerr << "bind error"<<endl;
exit(2);
}
}
static void Listen(int sock){
if(listen(sock,BACKLOG) < 0){
cerr << "listen error"<<endl;
exit(3);
}
}
static int Accept(int sock){
struct sockaddr_in peer;
socklen_t len = sizeof(peer);
return accept(sock, (struct sockaddr *)&peer, &len);
}
static void SetSockOpt(int sock){
int opt =1;
setsockopt(sock, SOL_SOCKET, SO_REUSEADDR, &opt, sizeof(opt));
}
};
EpollServer.hpp
#pragma once
#include "Sock.hpp"
#define NUM 20
class Bucket{
public:
Bucket(int fd)
:_fd(fd)
,_pos(0)
{
_buff[0] = 0;
}
char _buff[20];//数据缓冲区
int _fd;//文件描述符
size_t _pos;//从pos位置开始保存到缓存区
};
class EpollServer{
private:
int _lsock;
int _port;
int _efd;
public:
EpollServer(int lsock = -1, int port = 8080, int efd = -1)
:_lsock(lsock)
,_port(port)
,_efd(efd)
{}
//将sock添加到epoll模型的红黑树中
void AddSock2Epoll(int sock, int event){
struct epoll_event ev;
ev.events = event;
if(sock == _lsock){
//连接套接字不需要开辟桶空间
ev.data.ptr = nullptr;
}
else{
//Bucket保存数据
ev.data.ptr = new Bucket(sock);
}
if(epoll_ctl(_efd, EPOLL_CTL_ADD, sock, &ev) < 0){
cerr << "Add error , close sock"<<endl;
close(sock);
}
}
void DelSock2Epoll(int sock){
epoll_ctl(_efd, EPOLL_CTL_DEL, sock, nullptr);
}
void InitServer(){
_lsock = Sock::Socket();
Sock::SetSockOpt(_lsock);
Sock::Bind(_lsock, _port);
Sock::Listen(_lsock);
//创建epoll模型
_efd = epoll_create(256);
if(_efd < 0){
cerr << "epoll create error"<<endl;
exit(5);
}
AddSock2Epoll(_lsock, EPOLLIN);
}
void Handler(struct epoll_event revent[], int n){
for(int i =0; i < n; i++){
if(revent[i].events & EPOLLIN){
//读
//连接套接字没有创建缓冲区
if(revent[i].data.ptr == nullptr){
//有连接
int sock = Sock::Accept(_lsock);
if(sock < 0){
cerr << "accpet error"<<endl;
}
else{
//加入Epoll模型
cout<<"get a link..."<<endl;
AddSock2Epoll(sock, EPOLLIN);
}
}
else{
Bucket *bk = (Bucket *)revent[i].data.ptr;
//IO就绪
int s = recv(bk->_fd, bk->_buff + bk->_pos, sizeof(bk->_buff)- bk->_pos, 0);
if(s > 0){
bk->_pos += s;
if(bk->_pos >= sizeof(bk->_buff)){
bk->_buff[sizeof(bk->_buff) -1] = 0;
}
else{
bk->_buff[bk->_pos] = 0;
}
cout << "client# "<<bk->_buff<<endl;
if(bk->_pos >= sizeof(bk->_buff)){
//修改事件为写就绪
revent[i].events = EPOLLOUT;
epoll_ctl(_efd, EPOLL_CTL_MOD, bk->_fd, &revent[i]);
}
}
else if(s == 0){
cerr << "client close..."<<endl;
close(bk->_fd);
DelSock2Epoll(bk->_fd);
//销毁bk开辟的空间,防止内存泄漏
delete bk;
}
else{
cerr << "recv error"<<endl;
}
}
}
else if(revent[i].events & EPOLLOUT){
//写
Bucket *bk = (Bucket *)revent[i].data.ptr;
bk->_buff[sizeof(bk->_buff) -1] = '\\n';
send(bk->_fd, bk->_buff, sizeof(bk->_buff), 0);
//关闭连接
DelSock2Epoll(bk->_fd);
close(bk->_fd);
delete bk;
}
else{
//其它事件
}
}
}
void Start(){
struct epoll_event revent[NUM];
int timeout = -1;
while(1){
int n =epoll_wait(_efd, revent, NUM, timeout);
switch(n){
case -1:
cerr << "epoll wait error" <<endl;
break;
case 0:
cerr << "epoll timeout" <<endl;
break;
default:
Handler(revent,n);
break;
}
}
}
~EpollServer(){
close(_lsock);
close(_efd);
}
};
EpollServer.cc
#include"EpollServer.hpp"
void Notice(){
cout<<"Notice:\\n\\t"<<"please port"<<endl;
}
int main(int argc, char *argv[]){
if(argc != 2){
Notice();
exit(4);
}
EpollServer *es = new EpollServer(atoi(argv[1]));
es->InitServer();
es->Start();
delete es;
return 0;
}
以上是关于高效IO——多路转接epoll的主要内容,如果未能解决你的问题,请参考以下文章
五种高阶IO模型以及多路转接技术(selectpoll和epoll)及其代码验证