0 设计
EpollPoller继承自Poller,为的是能够使用epoll和poll两种io函数。(这貌似是策略模式?记不清了)
1 源码
EpollPoller.h
#ifndef MUDUO_NET_POLLER_EPOLLPOLLER_H #define MUDUO_NET_POLLER_EPOLLPOLLER_H #include <muduo/net/Poller.h> #include <vector> struct epoll_event; namespace muduo { namespace net { // epoll 继承自poller ,主要是为了能够同时使用epoll和poll // 这里应该算是策略模式吧 class EPollPoller : public Poller { public: EPollPoller(EventLoop *loop); virtual ~EPollPoller(); virtual Timestamp poll(int timeoutMs, ChannelList *activeChannels); virtual void updateChannel(Channel *channel); virtual void removeChannel(Channel *channel); private: static const int kInitEventListSize = 16; static const char *operationToString(int op); void fillActiveChannels(int numEvents, ChannelList *activeChannels) const; void update(int operation, Channel *channel); int epollfd_; // 这个将会代替 epoll_event 的数组,使用begin()迭代器传入 wait // 是用来存储已经就绪的 epoll_event 的 typedef std::vector<struct epoll_event> EventList; EventList events_; //这里是有 channels_ 成员的 是一个map,从基类中继承而来 }; } } #endif // MUDUO_NET_POLLER_EPOLLPOLLER_H
EpollPoller.cc
#include <muduo/net/poller/EPollPoller.h> #include <muduo/base/Logging.h> #include <muduo/net/Channel.h> #include <boost/static_assert.hpp> #include <assert.h> #include <errno.h> #include <poll.h> #include <sys/epoll.h> #include <unistd.h> using namespace muduo; using namespace muduo::net; // boost中的BOOST_STATIC_ASSERT是在编译期的断言,也就是说在编译的时候就可以断言出错误。 // 主要是兼容 BOOST_STATIC_ASSERT(EPOLLIN == POLLIN); BOOST_STATIC_ASSERT(EPOLLPRI == POLLPRI); BOOST_STATIC_ASSERT(EPOLLOUT == POLLOUT); BOOST_STATIC_ASSERT(EPOLLRDHUP == POLLRDHUP); BOOST_STATIC_ASSERT(EPOLLERR == POLLERR); BOOST_STATIC_ASSERT(EPOLLHUP == POLLHUP); namespace { // 又是匿名空间 // 这三个值其实是 标识符,用来识别,一个channel 是新添加,还是待修改,还是之前添加过 // 其含义是是否在epoll 中被监听 const int kNew = -1; const int kAdded = 1; const int kDeleted = 2; } EPollPoller::EPollPoller(EventLoop *loop) : Poller(loop), epollfd_(::epoll_create1(EPOLL_CLOEXEC)), events_(kInitEventListSize) { if (epollfd_ < 0) { LOG_SYSFATAL << "EPollPoller::EPollPoller"; } } EPollPoller::~EPollPoller() { ::close(epollfd_); } // 实际的wait函数 // 第一个参数是超时时间,第二个参数是:所有已就绪的channel被填充到activeChannels // 超时时间 : 0会立即返回,-1将不确定,也有说法说是永久阻塞 Timestamp EPollPoller::poll(int timeoutMs, ChannelList *activeChannels) { LOG_TRACE << "fd total count " << channels_.size(); // 注意这里, epoll_event 结构体使用的是vector,传入的begin() int numEvents = ::epoll_wait(epollfd_, &*events_.begin(), static_cast<int>(events_.size()), timeoutMs); int savedErrno = errno; // 记录唤醒的时间 Timestamp now(Timestamp::now()); // 通过 epoll_wait 返回值来判断是否有文件描述符就绪 if (numEvents > 0) { // 当确实有就绪的时候,那么就开始 fillActiveChannels 填充到 activeChannels LOG_TRACE << numEvents << " events happened"; fillActiveChannels(numEvents, activeChannels); // 然后当传入的events 的空间太小的时候,也就是说,还有很多就绪的时间没有被传出来 // 那么就扩容events if (implicit_cast<size_t>(numEvents) == events_.size()) { events_.resize(events_.size() * 2); } } else if (numEvents == 0) { // 超时以后,没有返回 LOG_TRACE << "nothing happened"; } else { // 被信号中断 if (savedErrno != EINTR) { errno = savedErrno; LOG_SYSERR << "EPollPoller::poll()"; } } return now; } // 填充函数 void EPollPoller::fillActiveChannels(int numEvents, ChannelList *activeChannels) const { assert(implicit_cast<size_t>(numEvents) <= events_.size()); // 遍历每一个就绪的 epoll_event // 要知道 epoll_event 的 data.ptr 指针绑定的是文件描述符被封装的 channel for (int i = 0; i < numEvents; ++i) { // 这里直接通过 data.ptr 来获得channel即可 Channel *channel = static_cast<Channel *>(events_[i].data.ptr); // 下面的四句是用来判断channel 的正确性的 #ifndef NDEBUG int fd = channel->fd(); ChannelMap::const_iterator it = channels_.find(fd); assert(it != channels_.end()); assert(it->second == channel); #endif // channel 内设置发生的事件,他就等于 epoll_event 的events channel->set_revents(events_[i].events); activeChannels->push_back(channel); } } // 更新channel 。由channel 发起,被eventloop 转发 void EPollPoller::updateChannel(Channel *channel) { Poller::assertInLoopThread(); // index 是 channel 的状态,是新添加,还是已经在监听,还是之前间听过,后来移除监听了 const int index = channel->index(); LOG_TRACE << "fd = " << channel->fd() << " events = " << channel->events() << " index = " << index; // 当是新的或是之前监听过,后来移除了监听 // 两者的区别在于,新的channel 之前没有在epoll 中保存 // 而 del 的之前在 channels_ 中保存了,但是没有被放入epoll_ctl中监听 if (index == kNew || index == kDeleted) { int fd = channel->fd(); if (index == kNew) { assert(channels_.find(fd) == channels_.end()); // channels_ 是一个map 在应用层,保存了所有的 channel 指针,维护一套channel channels_[fd] = channel; } else // index == kDeleted { // 如果index 是del ,那么 channels_ 中有channel 的记录 assert(channels_.find(fd) != channels_.end()); assert(channels_[fd] == channel); } // 然后都是设置 index 表示已添加到 epoll 中 channel->set_index(kAdded); // 更新个 epoll 去调用 epoll ctl update(EPOLL_CTL_ADD, channel); } else { // 如果是已经add 了。那么应该是要修改 // 下面四句是验证channel 的正确性 int fd = channel->fd(); (void)fd; assert(channels_.find(fd) != channels_.end()); assert(channels_[fd] == channel); assert(index == kAdded); // 既然已经添加了,那么可能的修改就是修改监听的时间,或者不在监听 // 因此这里先判断是否是没有监听的事件了,如果是那么直接移除、 if (channel->isNoneEvent()) { update(EPOLL_CTL_DEL, channel); channel->set_index(kDeleted); } else { // 否则,就是要更新事件。 update(EPOLL_CTL_MOD, channel); } } } // 移除channel void EPollPoller::removeChannel(Channel *channel) { // 验证 channel 的正确性 Poller::assertInLoopThread(); int fd = channel->fd(); LOG_TRACE << "fd = " << fd; assert(channels_.find(fd) != channels_.end()); assert(channels_[fd] == channel); assert(channel->isNoneEvent()); // 继续验证,是判断 index ,也就是channel 要移除一定不是 new, // 因为new 还没有添加到 channels_ 中 int index = channel->index(); assert(index == kAdded || index == kDeleted); size_t n = channels_.erase(fd); (void)n; assert(n == 1); // 开始正式的判断 // 如果是 add ,那么就从epoll 中删除 if (index == kAdded) { update(EPOLL_CTL_DEL, channel); } // 然后都是设置为new ,表示没有添加在 channels_ 中 channel->set_index(kNew); // 一般这个channel 是要被析构了 } // 更新 void EPollPoller::update(int operation, Channel *channel) { // 每次都要传入,因此,每次都是新的结构体. struct epoll_event event; bzero(&event, sizeof event); // 读取channel中的要设置的events event.events = channel->events(); // 设置 data_ptr 指向封装的结构体 event.data.ptr = channel; int fd = channel->fd(); // 打印log LOG_TRACE << "epoll_ctl op = " << operationToString(operation) << " fd = " << fd << " event = { " << channel->eventsToString() << " }"; // 实际的 epoll_ctl if (::epoll_ctl(epollfd_, operation, fd, &event) < 0) { if (operation == EPOLL_CTL_DEL) { LOG_SYSERR << "epoll_ctl op =" << operationToString(operation) << " fd =" << fd; } else { LOG_SYSFATAL << "epoll_ctl op =" << operationToString(operation) << " fd =" << fd; } } } // 打印状态 const char *EPollPoller::operationToString(int op) { switch (op) { case EPOLL_CTL_ADD: return "ADD"; case EPOLL_CTL_DEL: return "DEL"; case EPOLL_CTL_MOD: return "MOD"; default: assert(false && "ERROR op"); return "Unknown Operation"; } }
3 channels_ 存在的意思
epoll 没有办法保存已有的所有的在监听的文件描述符,因为epoll采用增量的形式传入epoll_event的,因此之前传入的就传入了,没有办法去获取出来。
因此使用 channels_ 来保存所有的 channel 也就是说所有的文件描述符。
用来在新添加和修改删除文件描述符的时候做验证。