Linux epoll 源码分析 1

Posted 底层技术研究

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Linux epoll 源码分析 1相关的知识,希望对你有一定的参考价值。

本文将从源码角度分析epoll的实现机制,使用的内核版本为


➜  bionic git:(ffdd392b8196) git remote -v

origin git://git.launchpad.net/~ubuntu-kernel/ubuntu/+source/linux/+git/bionic (fetch)

origin git://git.launchpad.net/~ubuntu-kernel/ubuntu/+source/linux/+git/bionic (push)

➜  bionic git:(ffdd392b8196) git status 

HEAD detached at Ubuntu-4.15.0-45.48


有关如何找到对应的内核源码,请参考 。


epoll的api有三种,其作用分别为


epoll_create1 用来创建epoll实例。

epoll_ctl 用来添加/修改/删除文件的监听事件。

epoll_wait 用来等待监听事件的发生。


epoll的事件触发机制有两种,分别为 level-triggered 和 edge-triggered。


默认为 level-triggered,当用 epoll_ctl 添加或修改监听事件时,可通过 EPOLLET 来标识该事件为 edge-triggered。


我们先来看下epoll_create1方法


// fs/eventpoll.c

SYSCALL_DEFINE1(epoll_create1, int, flags)

{

  int error, fd;

  struct eventpoll *ep = NULL;

  struct file *file;

  ...

  error = ep_alloc(&ep);

  ...

  fd = get_unused_fd_flags(O_RDWR | (flags & O_CLOEXEC));

  ...

  file = anon_inode_getfile("[eventpoll]", &eventpoll_fops, ep,

         O_RDWR | (flags & O_CLOEXEC));

  ...

  fd_install(fd, file);

  return fd;

  ...

}


该方法的主要操作有:


1. 调用ep_alloc方法创建一个eventpoll实例,其类型为


// fs/eventpoll.c

struct eventpoll {

  ...

  /* 调用epoll_wait方法的线程在被堵塞之前会放相应的信息在这个队列里

     这样当有监听事件发生时,这些线程就可以被唤醒 */

  wait_queue_head_t wq;

  ...

  /* 被监听的socket文件有对应的事件生成后,就会被放到这个队列中 */

  struct list_head rdllist;


  /* 被监听的socket文件会被放到这个数据结构里,红黑树 */

  struct rb_root_cached rbr;

  ...

};


2. 调用get_unused_fd_flags方法找到一个未使用的fd,这个就是最终返回给我们的文件描述符。


3. 调用anon_inode_getfile方法创建一个file实例,其类型为


// include/linux/fs.h

struct file {

  ...

  // 这个struct里存放了各种函数指针,用来指向操作文件的各种函数

  // 比如read/write等。这样不同类型的文件,就可以有不同的函数实现

  const struct file_operations  *f_op;

  ...

  // struct file 里的数据字段存放的是所有file类型通用的数据

  // 而下面这个字段存放的是和具体文件类型相关的数据

  void      *private_data;

  ...

}


调用anon_inode_getfile方法传入的参数中,eventpoll_fops最终被赋值到上面的f_op字段,ep被赋值到上面的private_data字段。


4. 调用fd_install方法在内核中建立 fd 与 file 的对应关系,这样以后就可以通过fd来找到对应的file。


5. 返回fd给用户。


至此,epoll_create1方法结束。


我们再来看下epoll_wait方法


// fs/eventpoll.c

SYSCALL_DEFINE4(epoll_wait, int, epfd, struct epoll_event __user *, events,

    int, maxevents, int, timeout)

{

  int error;

  struct fd f;

  struct eventpoll *ep;

  ...

  /* 根据epfd找到对应的file */

  f = fdget(epfd);

  ...

  /* epoll_create1方法中把eventpoll实例放到了private_data字段中 */

  ep = f.file->private_data;


  /* Time to fish for events ... */

  error = ep_poll(ep, events, maxevents, timeout);

  ...

  return error;

}


该方法参数中,epfd为epoll_create1方法返回的fd,events为用户提供的 struct epoll_event 类型的数组,用于存放有监听事件发生的那些监听对象,maxevents 表示这个数组的长度,也表示epoll_wait方法最多可返回maxevents个事件就绪的监听对象。


该方法最后又调用了ep_poll方法,继续看下这个方法


// fs/eventpoll.c

static int ep_poll(struct eventpoll *ep, struct epoll_event __user *events,

       int maxevents, long timeout)

{

  ...

  wait_queue_entry_t wait;

  ...

  if (!ep_events_available(ep)) {

   ...

    init_waitqueue_entry(&wait, current);

    __add_wait_queue_exclusive(&ep->wq, &wait);


    for (;;) {

      ...

      if (ep_events_available(ep) || timed_out)

        break;

      ...

      if (!schedule_hrtimeout_range(to, slack, HRTIMER_MODE_ABS))

        timed_out = 1;

      ...

    }

    __remove_wait_queue(&ep->wq, &wait);

    ...

  }

  ...

  eavail = ep_events_available(ep);

  ...

  if (!res && eavail &&

      !(res = ep_send_events(ep, events, maxevents)) && !timed_out)

    goto fetch_events;


  return res;

}


该方法的主要操作有


1. 判断是否有监听事件就绪,如果有则直接调用ep_send_events方法把就绪对象拷贝到events里,然后返回。


2. 如果没有,则先调用 init_waitqueue_entry 方法初始化wait变量,其中current参数为线程私有变量,线程相关的数据会放到这个变量中,同时,通过这个变量也能找到相应的线程。


我们先看下wait变量的类型


// include/linux/wait.h

typedef struct wait_queue_entry wait_queue_entry_t;

...

struct wait_queue_entry {

  unsigned int    flags;

  void      *private;

  wait_queue_func_t func;

  struct list_head  entry;

};


再看下 init_waitqueue_entry 方法


// include/linux/wait.h

static inline void init_waitqueue_entry(struct wait_queue_entry *wq_entry, struct task_struct *p)

{

  wq_entry->flags   = 0;

  wq_entry->private = p;

  wq_entry->func    = default_wake_function;

}


这里的 default_wake_function 方法就是用来唤醒 p 变量对应的线程的。该方法的实现后面我们会讲到。


3. 初始化完wait变量之后,把它放到eventpoll的wq队列中,这个上面我们也有提到过。


4. 然后进入for循环,其逻辑为,检查是否有监听事件就绪,如果没有,则调用 schedule_hrtimeout_range 方法,使当前线程进入休眠状态。


5. 当各种情况,比如signal、timeout、监听事件发生,导致该线程被唤醒,则会再进入下一次for循环,并检查监听事件是否就绪,如果就绪了,则跳出for循环,同时把wait变量从eventpoll的wq队列中移除。


6. 调用 ep_send_events 方法把就绪事件的对象拷贝到用户提供的events数组中,然后返回。


这里我们再着重看下 ep_send_events 方法。


// fs/eventpoll.c

static int ep_send_events(struct eventpoll *ep,

        struct epoll_event __user *events, int maxevents)

{

  struct ep_send_events_data esed;


  esed.maxevents = maxevents;

  esed.events = events;


  return ep_scan_ready_list(ep, ep_send_events_proc, &esed, 0, false);

}


该方法又调用了 ep_scan_ready_list 方法,其中参数 ep_send_events_proc 为一个回调方法,在 ep_scan_ready_list 方法中会使用到,后面会再详细说。


// fs/eventpoll.c

static int ep_scan_ready_list(struct eventpoll *ep,

            int (*sproc)(struct eventpoll *,

             struct list_head *, void *),

            void *priv, int depth, bool ep_locked)

{

  ...

  LIST_HEAD(txlist);

  ...

  list_splice_init(&ep->rdllist, &txlist);

  ...

  error = (*sproc)(ep, &txlist, priv);

  ...

  return error;

}


该方法的大体逻辑是,将eventpoll中的rdllist列表内容转移到txlist列表中,同时把rdllist列表置为空,现在txlist就持有了所有有就绪事件的对象。


然后调用上面的回调方法 ep_send_events_proc,将该列表传入其中。


我们再看下这个回调方法。


// fs/eventpoll.c

static int ep_send_events_proc(struct eventpoll *ep, struct list_head *head,

             void *priv)

{

  struct ep_send_events_data *esed = priv;

  ...

  struct epoll_event __user *uevent;

  ...

  for (eventcnt = 0, uevent = esed->events;

       !list_empty(head) && eventcnt < esed->maxevents;) {

    epi = list_first_entry(head, struct epitem, rdllink);

    ...

    list_del_init(&epi->rdllink);


    revents = ep_item_poll(epi, &pt, 1);

    ...

    if (revents) {

      if (__put_user(revents, &uevent->events) ||

          __put_user(epi->event.data, &uevent->data)) {

        ...

      }

      eventcnt++;

      uevent++;

      if (epi->event.events & EPOLLONESHOT)

       ...

      else if (!(epi->event.events & EPOLLET)) {

        /*

         * 如果是 level-triggered,该对象还会被添加到就绪列表里

         * 这样下次调用 epoll_wait 还会检查这个对象

         */

        list_add_tail(&epi->rdllink, &ep->rdllist);

       ...

      }

    }

  }


  return eventcnt;

}


该方法的操作大体为


1. 遍历head就绪列表中的所有对象,对其调用 ep_item_poll 方法,真正的去检查我们关心的那些事件是否存在。


对于tcp socket对象,这个方法最终会调用 tcp_poll 方法,由于该方法涉及的都是tcp相关的内容,我们以后会另起文章再讲。


2. 如果有我们感兴趣的事件,则将该事件拷贝到用户event中。


3. 如果该监听对象是 level-triggered 模式,则会把该对象再加入到就绪列表中,这样下次再调用 epoll_wait 方法,还会检查这些对象。


这也是 level-triggered 和 edge-triggered 在代码上表现出来的本质区别。


4. 所有监听对象检查完毕后,此时满足条件的对象已经被拷贝到用户提供的events里,到这里方法就可以返回了。


至此,epoll_wait 方法也分析完毕。


有关 epoll_ctl 方法及其他epoll内容,我们会在另起文章再来分析。

以上是关于Linux epoll 源码分析 1的主要内容,如果未能解决你的问题,请参考以下文章

Linux epoll 源码分析 3

Linux epoll 源码分析 2

epoll源码分析以及在Redis中的实现

Linux epoll模型详解及源码分析

Linux epoll模型详解及源码分析

面试必问的epoll技术,从内核源码出发彻底搞懂epoll