epoll为啥这么快?epoll的实现原理是啥?
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了epoll为啥这么快?epoll的实现原理是啥?相关的知识,希望对你有一定的参考价值。
参考技术A以一个生活中的例子来解释.假设你在大学中读书,要等待一个朋友来访,而这个朋友只知道你在A号楼,但是不知道你具体住在哪里,于是你们约好了在A号楼门口见面.如果你使用的阻塞IO模型来处理这个问题,那么你就只能一直守候在A号楼门口等待朋友的到来,...
epoll是Linux内核为处理大批量文件描述符而作了改进的poll,是Linux下多路复用IO接口select/poll的增强版本,它能显著提高程序在大量并发连接中只有少量活跃的情况下的系统CPU利用率。另一点原因就是获取事件的时候,它无须遍历整个被侦听的描述符集,只要遍历那些被内核IO事件异步唤醒而加入Ready队列的描述符集合就行了。epoll除了提供select/poll那种IO事件的水平触发(Level Triggered)外,还提供了边缘触发(Edge Triggered),这就使得用户空间程序有可能缓存IO状态,减少epoll_wait/epoll_pwait的调用,提高应用程序效率。
select 最不能忍受的是一个进程所打开的FD是有一定限制的,由FD_SETSIZE设置,默认值是2048。对于那些需要支持的上万连接数目的IM服务器来说显然太少了。这时候你一是可以选择修改这个宏然后重新编译服务器代码,不过资料也同时指出这样会带来网络效率的下降,二是可以选择多进程的解决方案(传统的Apache方案),不过虽然linux上面创建进程的代价比较小,但仍旧是不可忽视的,加上进程间数据同步远比不上线程间同步的高效,所以也不是一种完美的方案。不过 epoll则没有这个限制,它所支持的FD上限是最大可以打开文件的数目,这个数字一般远大于2048,举个例子,在1GB内存的机器上大约是10万左右,具体数目可以cat /proc/sys/fs/file-max查看,一般来说这个数目和系统内存关系很大。
Linux epoll 实现原理
文章目录
epoll 是 Linux 平台下的一种特有的 IO 多路复用的实现方式,与传统的 select/poll 相比,epoll 在性能上有很大的提升。本文主要讲解 epoll 的实现原理。
1.epoll 的用法
先复习下 epoll 的用法。
如下的代码中,先用 epoll_create 创建一个 epoll 文件描述符 epfd,再通过 epoll_ctl 将需要监听的 socket 添加到 epfd 中,最后调用 epoll_wait 等待数据。
int s = socket(AF_INET, SOCK_STREAM, 0);
bind(s, ...);
listen(s, ...)
int epfd = epoll_create(...);
epoll_ctl(epfd, ...); // 将所有需要监听的 socket 添加到 epfd 中。
#define MAX_EVENTS 10
struct epoll_event events[MAX_EVENTS];
while(1)
int n = epoll_wait(epfd, events, ...); // 返回就绪的 socket 数量。
for(int i = 0; i < n; ++i)
if (events[n].data.fd == listen_sock)
// 处理
创建 socket 时,操作系统会创建一个由文件系统管理的 socket 对象。这个socket对象包含了发送缓冲区、接收缓冲区、等待队列等成员。等待队列是个非常重要的结构,它指向所有需要等待该 socket 事件的进程。
2.epoll 的创建
要使用 epoll 首先需要调用 epoll_create() 函数创建一个 epoll 的文件描述符,epoll_create() 函数原型如下:
int epoll_create(int size);
参数 size 是由于历史原因遗留下来的,自 Linux 2.6.8 以来,已不起作用,但必须大于零。
当用户调用 epoll_create() 函数时,会进入到内核空间,并且调用 do_epoll_create() 内核函数来创建 epoll 句柄,do_epoll_create() 函数代码如下:
/*
* Open an eventpoll file descriptor.
*/
static int do_epoll_create(int flags)
int error, fd;
struct eventpoll *ep = NULL;
struct file *file;
/* Check the EPOLL_* constant for consistency. */
BUILD_BUG_ON(EPOLL_CLOEXEC != O_CLOEXEC);
if (flags & ~EPOLL_CLOEXEC)
return -EINVAL;
/*
* Create the internal data structure ("struct eventpoll").
*/
error = ep_alloc(&ep);
if (error < 0)
return error;
/*
* Creates all the items needed to setup an eventpoll file. That is,
* a file structure and a free file descriptor.
*/
fd = get_unused_fd_flags(O_RDWR | (flags & O_CLOEXEC));
if (fd < 0)
error = fd;
goto out_free_ep;
file = anon_inode_getfile("[eventpoll]", &eventpoll_fops, ep,
O_RDWR | (flags & O_CLOEXEC));
if (IS_ERR(file))
error = PTR_ERR(file);
goto out_free_fd;
ep->file = file;
fd_install(fd, file);
return fd;
out_free_fd:
put_unused_fd(fd);
out_free_ep:
ep_free(ep);
return error;
sys_epoll_create() 主要做两件事情:
- 调用 ep_alloc() 函数创建并初始化一个 eventpoll 对象。
- 调用 anon_inode_getfd() 函数把 eventpoll 对象映射到一个文件描述符,并返回这个文件描述符。
3.epoll 对象结构
从 do_epoll_create() 源码可以看出,epoll 对象实际上是一个 eventpoll,定义如下:
struct eventpoll
/* Protect the access to this structure */
spinlock_t lock;
/*
* This mutex is used to ensure that files are not removed
* while epoll is using them. This is held during the event
* collection loop, the file cleanup path, the epoll file exit
* code and the ctl operations.
*/
struct mutex mtx;
/* Wait queue used by sys_epoll_wait() */
wait_queue_head_t wq;
/* Wait queue used by file->poll() */
wait_queue_head_t poll_wait;
/* List of ready file descriptors */
struct list_head rdllist;
/* RB tree root used to store monitored fd structs */
struct rb_root rbr;
/*
* This is a single linked list that chains all the "struct epitem" that
* happened while transferring ready events to userspace w/out
* holding ->lock.
*/
struct epitem *ovflist;
/* wakeup_source used when ep_scan_ready_list is running */
struct wakeup_source *ws;
/* The user that created the eventpoll descriptor */
struct user_struct *user;
struct file *file;
/* used to optimize loop detection check */
int visited;
struct list_head visited_list_link;
其中有几个成员需要重点关注的:
- 被监听的 Socket 列表(rbr )
通过 epoll_ctl(2) 将监控的 socket 添加到以红黑树保存的列表中。
- 等待队列(wq)
和 socket 一样,它也会有等待队列,当调用 epoll_wait(epfd, …) 时会把进程添加到 eventpoll 对象的 wq 等待队列中。
- 就绪 socket 列表(rdllist)
保存已经就绪的 socket 文件描述符列表。当程序执行到 epoll_wait 时,如果 rdlist 已经存在就绪的 socket,那么 epoll_wait 直接返回,如果 rdlist 为空,阻塞进程。
下图展示了 eventpoll 对象与被监听的文件关系:
由于被监听的文件是通过 epitem 对象来管理的,所以上图中的节点都是以 epitem 对象的形式存在的。为什么要使用红黑树来管理被监听的文件呢?这是为了能够通过文件描述符快速查找到其对应的 epitem 对象。
4.向 epoll 添加文件描述符
前面介绍了怎么创建 epoll,接下来介绍一下怎么向 epoll 添加要监听的 socket。
通过调用 epoll_ctl() 函数可以向 epoll 添加要监听的文件,其原型如下:
long epoll_ctl(int epfd, int op, int fd,struct epoll_event *event);
下面说明一下各个参数的作用:
- epfd: 通过调用 epoll_create() 函数返回的文件描述符。
- op: 要进行的操作,有 3 个选项。
- EPOLL_CTL_ADD:表示要进行添加操作。
- EPOLL_CTL_DEL:表示要进行删除操作。
- EPOLL_CTL_MOD:表示要进行修改操作。
- fd: 要监听的文件描述符。
- event: 告诉内核需要监听什么事件。其定义如下:
struct epoll_event
__uint32_t events; /* Epoll events */
epoll_data_t data; /* User data variable */
;
events 可以是以下几个宏的集合:
- EPOLLIN :表示对应的文件描述符可以读(包括对端 SOCKET 正常关闭);
- EPOLLOUT:表示对应的文件描述符可以写;
- EPOLLPRI:表示对应的文件描述符有紧急的数据可读;
- EPOLLERR:表示对应的文件描述符发生错误;
- EPOLLHUP:表示对应的文件描述符被挂断;
- EPOLLET:将 EPOLL 设为边缘触发(Edge Triggered)模式,这是相对于水平触发(Level Triggered)来说的。
- EPOLLONESHOT:只监听一次事件,当监听完这次事件之后,如果还需要继续监听这个 socket 的话,需要再次把这个 socket 加入到 EPOLL 队列里。
data 用来保存用户自定义数据。
epoll_ctl() 函数会调用 do_epoll_ctl() 内核函数,do_epoll_ctl() 内核函数的实现如下:
int do_epoll_ctl(int epfd, int op, int fd, struct epoll_event *epds,
bool nonblock)
...
f = fdget(epfd);
if (!f.file)
goto error_return;
/* Get the "struct file *" for the target file */
tf = fdget(fd);
if (!tf.file)
goto error_fput;
...
/*
* At this point it is safe to assume that the "private_data" contains
* our own data structure.
*/
ep = f.file->private_data;
error = epoll_mutex_lock(&ep->mtx, 0, nonblock);
if (error)
goto error_tgt_fput;
...
/*
* Try to lookup the file inside our RB tree. Since we grabbed "mtx"
* above, we can be sure to be able to use the item looked up by
* ep_find() till we release the mutex.
*/
epi = ep_find(ep, tf.file, fd);
error = -EINVAL;
switch (op)
case EPOLL_CTL_ADD:
if (!epi)
epds->events |= EPOLLERR | EPOLLHUP;
error = ep_insert(ep, epds, tf.file, fd, full_check);
else
error = -EEXIST;
break;
case EPOLL_CTL_DEL:
if (epi)
error = ep_remove(ep, epi);
else
error = -ENOENT;
break;
case EPOLL_CTL_MOD:
if (epi)
if (!(epi->event.events & EPOLLEXCLUSIVE))
epds->events |= EPOLLERR | EPOLLHUP;
error = ep_modify(ep, epi, epds);
else
error = -ENOENT;
break;
mutex_unlock(&ep->mtx);
error_tgt_fput:
if (full_check)
clear_tfile_check_list();
loop_check_gen++;
mutex_unlock(&epmutex);
fdput(tf);
error_fput:
fdput(f);
error_return:
return error;
sys_epoll_ctl() 函数会根据传入不同 op 的值来进行不同操作,比如传入 EPOLL_CTL_ADD 表示要进行添加操作,那么就调用 ep_insert() 函数来进行添加操作。
我们继续来分析添加操作 ep_insert() 函数的实现:
static int ep_insert(struct eventpoll *ep, struct epoll_event *event,
struct file *tfile, int fd)
...
error = -ENOMEM;
// 申请一个 epitem 对象
if (!(epi = kmem_cache_alloc(epi_cache, GFP_KERNEL)))
goto error_return;
// 初始化 epitem 对象
INIT_LIST_HEAD(&epi->rdllink);
INIT_LIST_HEAD(&epi->fllink);
INIT_LIST_HEAD(&epi->pwqlist);
epi->ep = ep;
ep_set_ffd(&epi->ffd, tfile, fd);
epi->event = *event;
epi->nwait = 0;
epi->next = EP_UNACTIVE_PTR;
epq.epi = epi;
// 等价于: epq.pt->qproc = ep_ptable_queue_proc
init_poll_funcptr(&epq.pt, ep_ptable_queue_proc);
// 调用被监听文件的 poll 接口.
// 这个接口又各自文件系统实现, 如socket的话, 那么这个接口就是 tcp_poll().
revents = tfile->f_op->poll(tfile, &epq.pt);
...
ep_rbtree_insert(ep, epi); // 把 epitem 对象添加到epoll的红黑树中进行管理
spin_lock_irqsave(&ep->lock, flags);
// 如果被监听的文件已经可以进行对应的读写操作
// 那么就把文件添加到epoll的就绪队列 rdllink 中, 并且唤醒调用 epoll_wait() 的进程.
if ((revents & event->events) && !ep_is_linked(&epi->rdllink))
list_add_tail(&epi->rdllink, &ep->rdllist);
if (waitqueue_active(&ep->wq))
wake_up_locked(&ep->wq);
if (waitqueue_active(&ep->poll_wait))
pwake++;
spin_unlock_irqrestore(&ep->lock, flags);
...
return 0;
...
被监听的文件是通过 epitem 对象进行管理的,也就是说被监听的文件会被封装成 epitem 对象,然后会被添加到 eventpoll 对象的红黑树中进行管理(如上述代码中的 ep_rbtree_insert(ep, epi))。
tfile->f_op->poll(tfile, &epq.pt) 这行代码的作用是调用被监听文件的 poll() 接口,如果被监听的文件是一个 socket 句柄,那么就会调用 tcp_poll(),我们来看看 tcp_poll() 做了什么操作:
unsigned int tcp_poll(struct file *file, struct socket *sock, poll_table *wait)
struct sock *sk = sock->sk;
...
poll_wait(file, sk->sk_sleep, wait);
...
return mask;
每个 socket 对象都有个等待队列用于存放等待 socket 状态更改的进程。
从上述代码可以知道,tcp_poll() 调用了 poll_wait() 函数,而 poll_wait() 最终会调用 ep_ptable_queue_proc() 函数,ep_ptable_queue_proc() 函数实现如下:
static void ep_ptable_queue_proc(struct file *file,
wait_queue_head_t *whead, poll_table *pt)
struct epitem *epi = ep_item_from_epqueue(pt);
struct eppoll_entry *pwq;
if (epi->nwait >= 0 && (pwq = kmem_cache_alloc(pwq_cache, GFP_KERNEL)))
init_waitqueue_func_entry(&pwq->wait, ep_poll_callback);
pwq->whead = whead;
pwq->base = epi;
add_wait_queue(whead, &pwq->wait);
list_add_tail(&pwq->llink, &epi->pwqlist);
epi->nwait++;
else
epi->nwait = -1;
ep_ptable_queue_proc() 函数主要工作是把当前 epitem 对象添加到 socket 对象的等待队列中,并且设置唤醒函数为 ep_poll_callback(),也就是说,当 socket 状态发生变化时,会触发调用 ep_poll_callback() 函数。
ep_poll_callback() 函数实现如下:
static int ep_poll_callback(wait_queue_t *wait, unsigned mode, int sync, void *key)
...
// 把就绪的文件添加到就绪队列中
list_add_tail(&epi->rdllink, &ep->rdllist);
is_linked:
// 唤醒调用 epoll_wait() 而被阻塞的进程。
if (waitqueue_active(&ep->wq))
wake_up_locked(&ep->wq);
...
return 1;
ep_poll_callback() 函数的主要工作是把就绪的文件描述符添加到 eventepoll 对象的就绪队列中,然后唤醒调用 epoll_wait() 被阻塞的进程。
5.阻塞和唤醒进程
把被监听的文件描述符添加到 epoll 后,就可以通过调用 epoll_wait() 等待被监听的文件状态发生改变。epoll_wait() 调用会阻塞当前进程,当被监听的文件状态发生改变时,epoll_wait() 调用便会返回。
epoll_wait() 系统调用的原型如下:
long epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
各个参数的意义:
- epfd: 调用 epoll_create() 函数创建的epoll句柄。
- events: 用来存放就绪文件列表。
- maxevents: events 数组的大小。
- timeout: 设置等待的超时时间。
epoll_wait() 函数会调用 sys_epoll_wait() 内核函数,而 sys_epoll_wait() 函数最终会调用 ep_poll() 函数,我们来看看 ep_poll() 函数的实现:
static int ep_poll(struct eventpoll *ep,
struct epoll_event __user *events, int maxevents, long timeout)
...
// 如果就绪文件列表为空
if (list_empty(&ep->rdllist))
// 把当前进程添加到epoll的等待队列中
init_waitqueue_entry(&wait, current);
wait.flags |= WQ_FLAG_EXCLUSIVE;
__add_wait_queue(&ep->wq, &wait);
for (;;)
set_current_state(TASK_INTERRUPTIBLE); // 把当前进程设置为睡眠状态
if (!list_empty(&ep->rdllist) || !jtimeout) // 如果有就绪文件或者超时, 退出循环
break;
if (signal_pending(current)) // 接收到信号也要退出
res = -EINTR;
break;
spin_unlock_irqrestore(&ep->lock, flags);
jtimeout = schedule_timeout(jtimeout); // 让出CPU, 切换到其他进程进行执行
spin_lock_irqsave(&ep->lock, flags);
// 有3种情况会执行到这里:
// 1. 被监听的文件集合中有就绪的文件
// 2. 设置了超时时间并且超时了
// 3. 接收到信号
__remove_wait_queue(&ep->wq, &wait);
set_current_state(TASK_RUNNING);
/* 是否有就绪的文件? */
eavail = !list_empty(&ep->rdllist);
spin_unlock_irqrestore(&ep->lock, flags);
if (!res && eavail
&& !(res = ep_send_events(ep, events, maxevents)) && jtimeout)
goto retry;
return res;
ep_poll() 函数主要做以下几件事:
- 判断被监听的文件集合中是否有就绪的文件,如果有就返回。
- 如果没有就把当前进程添加到 epoll 的等待队列中,并且进入睡眠。
- 进程会一直睡眠直到有以下几种情况发生:
- 被监听的文件集合中有就绪的文件
- 设置了超时时间并且超时了
- 接收到信号
- 如果有就绪的文件,那么就调用 ep_send_events() 函数把就绪文件复制到 events 参数中。
- 返回就绪文件描述符个数。
6.小结
下面通过文字来描述一下这个 epoll 实现 IO 多路复用的整个过程:
- 通过调用 epoll_create() 函数创建并初始化一个 eventpoll 对象。
- 通过调用 epoll_ctl() 函数把被监听的文件描述符 (如 socket 文件描述符) 封装成 epitem 对象并且添加到 eventpoll 对象的红黑树中进行管理。
- 通过调用 epoll_wait() 函数等待被监听的文件状态发生改变。
- 当被监听的文件状态发生改变时(如 socket 接收到数据):
- 会把文件描述符对应 epitem 对象添加到 eventpoll 对象的就绪队列 rdllist 中。并且把就绪队列的文件列表复制到 epoll_wait() 函数的 events 参数中。
- 操作系统将该 socket 等待队列上的进程重新放回到工作队列,该进程变成运行状态,即唤醒调用 epoll_wait() 函数被阻塞(睡眠)的进程。
参考文献
epoll_create(2) - Linux manual page - man7.org
linux内核Epoll 实现原理
Linux source code (v6.0) - Elixir Bootlin
如果这篇文章说不清epoll的本质,那就过来掐死我吧!
以上是关于epoll为啥这么快?epoll的实现原理是啥?的主要内容,如果未能解决你的问题,请参考以下文章