0 设计
EventLoop 类,主要是用来管理一个进程中的channel、计时器、以及epoll 的类。
每个类只有一个,(因为如果有两个,那么一个eventloop 在loop()的时候,另一个eventloop 得不到执行)。
每一个channel或者说是每一个文件描述符都必须属于eventloop,换句话说,每一个文件描述符属于一个线程。(这是必然的,因为eventloop在代表了一个线程)
eventloop 类的主要工作,就是wait epoll,然后执行channel 的事件处理函数。
1 源码
#ifndef MUDUO_NET_EVENTLOOP_H #define MUDUO_NET_EVENTLOOP_H #include <vector> #include <boost/any.hpp> #include <boost/function.hpp> #include <boost/noncopyable.hpp> #include <boost/scoped_ptr.hpp> #include <muduo/base/Mutex.h> #include <muduo/base/CurrentThread.h> #include <muduo/base/Timestamp.h> #include <muduo/net/Callbacks.h> #include <muduo/net/TimerId.h> namespace muduo { namespace net { class Channel; class Poller; class TimerQueue; // 事件处理类 class EventLoop : boost::noncopyable { public: typedef boost::function<void()> Functor; EventLoop(); ~EventLoop(); // 里面一个while循环,直接开始工作. void loop(); // 别的线程调用eventloop 的quit,设置 loop() 中循环的判断条件 // 从而在下一次循环的时候停止 void quit(); // 返回上一次 epoll 的返回时间 Timestamp pollReturnTime() const { return pollReturnTime_; } int64_t iteration() const { return iteration_; } // 添加函数对象到eventloop线程中执行。 void runInLoop(const Functor &cb); // 当调用函数添加函数对象到eventloop中执行的时候 // 需要判断是否是在eventloop 线程中.如果是在当前eventloop 的线程中 // 那么直接执行,否则,就添加到队列中,然后唤醒epoll void queueInLoop(const Functor &cb); // 返回当前 等待执行函数对象的队列长度 size_t queueSize() const; // 下面三个函数,是用来添加计时器对象的 TimerId runAt(const Timestamp &time, const TimerCallback &cb); TimerId runAfter(double delay, const TimerCallback &cb); TimerId runEvery(double interval, const TimerCallback &cb); void cancel(TimerId timerId); // wakeup函数用来显示的唤醒 epoll ,主要是在 eventloop执行了quit // 还有添加了函数对象到队列中的时候 主动的唤醒 epoll // 能唤醒的原因在于,eventloop 对象持有一个 eventfd,注册在了epoll中 // 然后需要唤醒的时候,直接 eventfd 中写数据即可. void wakeup(); // 这两个函数被channel 的update 和remove 调用 // 然后,这俩函数由去调用epoll 的对应接口 void updateChannel(Channel *channel); void removeChannel(Channel *channel); // 用来判断一个 epoll 是否持有 这个 channel // 调用的是 epoll 的对象 bool hasChannel(Channel *channel); // 用来判断 绑定在eventloop 中的channel 的运行是否在 本eventloop 线程中 // 或是其他操作,需要属于本eventloop 线程 void assertInLoopThread() { if (!isInLoopThread()) { abortNotInLoopThread(); } } // 被上面的函数调用,就是简单的判断下初始化eventloop 的线程是不是当前运行函数的线程 bool isInLoopThread() const { return threadId_ == CurrentThread::tid(); } // eventHandling() 用来判断是否是在已就绪channel的事件处理函数。 // 因为channel的处理只能是在 eventloop 线程中,也就是说在 loop 处理的时候 // 如果这个时候 remove channel 那么要删除的channel 就是当前正在处理channel bool eventHandling() const { return eventHandling_; } // 没被用到,貌似 void setContext(const boost::any &context) { context_ = context; } const boost::any &getContext() const { return context_; } boost::any *getMutableContext() { return &context_; } static EventLoop *getEventLoopOfCurrentThread(); private: void abortNotInLoopThread(); // 这个函数是在,eventloop 主动唤醒 epoll 的时候处理eventfd 读事件的函数 void handleRead(); // waked up // 执行队列中的函数对象 void doPendingFunctors(); // 用来打印本次已就绪channel 发生的对应的时间,也就是去调用了channel 的tostring void printActiveChannels() const; // DEBUG typedef std::vector<Channel *> ChannelList; bool looping_; bool quit_; bool eventHandling_; bool callingPendingFunctors_; // iteration_ 是poller 被唤醒的次数 int64_t iteration_; // 初始化时候,本eventloop 所在的线程 const pid_t threadId_; // 上一次epoll被唤醒的时间戳 Timestamp pollReturnTime_; // 具体的epoll ,实现上可是用poll的,因此是一个继承体系 boost::scoped_ptr<Poller> poller_; // 计时器队列. boost::scoped_ptr<TimerQueue> timerQueue_; // eventfd,是eventloop主动唤醒epoll的fd int wakeupFd_; // 是分装eventfd的channel boost::scoped_ptr<Channel> wakeupChannel_; boost::any context_; // 这个容器被epoll 填充。装了所有已就绪的channel ChannelList activeChannels_; // 当前正在处理的 channel Channel *currentActiveChannel_; mutable MutexLock mutex_; // 添加到eventloop 中需要执行的函数对象,就在这里。 std::vector<Functor> pendingFunctors_; }; } } #endif // MUDUO_NET_EVENTLOOP_H
实现
#include <muduo/net/EventLoop.h> #include <muduo/base/Logging.h> #include <muduo/base/Mutex.h> #include <muduo/net/Channel.h> #include <muduo/net/Poller.h> #include <muduo/net/SocketsOps.h> #include <muduo/net/TimerQueue.h> #include <boost/bind.hpp> #include <signal.h> #include <sys/eventfd.h> #include <unistd.h> using namespace muduo; using namespace muduo::net; // 当定义一个命名空间时,可以忽略这个命名空间的名称: // 编译器在内部会为这个命名空间生成一个唯一的名字,而且还会为这个匿名的命名空间生成一条using指令。 // 命名空间都是具有external 连接属性的,只是匿名的命名空间产生的__UNIQUE_NAME__在别的文件中无法得到,这个唯一的名字是不可见的. namespace { // 使用了 __thread 每个线程保存自己的eventloop指针 __thread EventLoop *t_loopInThisThread = 0; // epoll 的超时时间 const int kPollTimeMs = 10000; // 是eventloop 的eventfd ,用来唤醒 epoll // 唤醒的时机在前面说了 int createEventfd() { int evtfd = ::eventfd(0, EFD_NONBLOCK | EFD_CLOEXEC); if (evtfd < 0) { LOG_SYSERR << "Failed in eventfd"; // 如果创建失败,那么直接结束程序,没有必要执行下去了 abort(); } return evtfd; } // 这是两条预处理执行令,表示发生 -Wold-style-cast 的时候忽略 // 然后到下一条. #pragma GCC diagnostic ignored "-Wold-style-cast" // 这个对象只是用一次。 // 是为了防止管道破裂的。因此在构造的时候,直接将SIGPIPE设置为忽略 // 并且初始化的时候就执行了 class IgnoreSigPipe { public: IgnoreSigPipe() { ::signal(SIGPIPE, SIG_IGN); // LOG_TRACE << "Ignore SIGPIPE"; } }; // 到这里的时候开启. #pragma GCC diagnostic error "-Wold-style-cast" IgnoreSigPipe initObj; } // 返回本线程的 eventloop 对象 EventLoop *EventLoop::getEventLoopOfCurrentThread() { return t_loopInThisThread; } EventLoop::EventLoop() : looping_(false), quit_(false), eventHandling_(false), callingPendingFunctors_(false), iteration_(0), threadId_(CurrentThread::tid()), poller_(Poller::newDefaultPoller(this)), // 使用一个工厂模式返回poll timerQueue_(new TimerQueue(this)), // 和计时器有关的 wakeupFd_(createEventfd()), // 创建eventloop 主动唤醒 epoll 的channel wakeupChannel_(new Channel(this, wakeupFd_)), currentActiveChannel_(NULL) { LOG_DEBUG << "EventLoop created " << this << " in thread " << threadId_; // 如果当前线程已经有一个eventloop 了那么就退出啊。 LOG_FATAL 应该会直接停了。 if (t_loopInThisThread) { LOG_FATAL << "Another EventLoop " << t_loopInThisThread << " exists in this thread " << threadId_; } else { t_loopInThisThread = this; } wakeupChannel_->setReadCallback( boost::bind(&EventLoop::handleRead, this)); // we are always reading the wakeupfd wakeupChannel_->enableReading(); } // 析构函数 EventLoop::~EventLoop() { LOG_DEBUG << "EventLoop " << this << " of thread " << threadId_ << " destructs in thread " << CurrentThread::tid(); // 先将 wakechannel 注销掉,然后直接关闭就行了。 // 因为也没什么需要回收的资源,都是使用指针指针. wakeupChannel_->disableAll(); wakeupChannel_->remove(); ::close(wakeupFd_); t_loopInThisThread = NULL; } void EventLoop::loop() { assert(!looping_); assertInLoopThread(); looping_ = true; quit_ = false; LOG_TRACE << "EventLoop " << this << " start looping"; // 使用 quit 判断是否应该继续循环 while (!quit_) { // activeChannels_ 用来填充就绪channel的 activeChannels_.clear(); // 传入了 activeChannels_ 的地址,由poll 填充已就绪的channel 到activeChannels_中 pollReturnTime_ = poller_->poll(kPollTimeMs, &activeChannels_); // iteration_ 是poller 被唤醒的次数 ++iteration_; // 根据日志级别,决定是否打印出所有的channel 的事件 if (Logger::logLevel() <= Logger::TRACE) { printActiveChannels(); } // 接下来遍历所有的 activeChannels_ 中的元素,去执行它们的 // handleEvent() 实际的执行channel 的事件处理函数 // 这里面可能有 TimerQueue 的channel ,用来处理已经超时的计时器 // 这些计时器,被 TimerQueue 通过 add 添加函数对象的形式添加到了队列中?或是直接执行了。 // 想一下,应该是直接执行了,因为是在本线程中。 // 嗯在for中直接执行了每一个超时的计时器 eventHandling_ = true; for (ChannelList::iterator it = activeChannels_.begin(); it != activeChannels_.end(); ++it) { // 这里保存一次 currentActiveChannel_ 的意义是 // 只是在删除额时候判断,删除的是否是这个channel、 // 还有一个没看懂 currentActiveChannel_ = *it; currentActiveChannel_->handleEvent(pollReturnTime_); } currentActiveChannel_ = NULL; eventHandling_ = false; // 执行add 到eventloop 的函数对象。 doPendingFunctors(); } LOG_TRACE << "EventLoop " << this << " stop looping"; looping_ = false; } // 退出。 void EventLoop::quit() { quit_ = true; // 推出的时候,直接wakeup 唤醒 epoll if (!isInLoopThread()) { wakeup(); } } void EventLoop::runInLoop(const Functor &cb) { // 添加函数对象,如果是在本线程中就直接执行,否则就添加到队列中 if (isInLoopThread()) { cb(); } else { queueInLoop(cb); } } // 添加到队列中 void EventLoop::queueInLoop(const Functor &cb) { { MutexLockGuard lock(mutex_); pendingFunctors_.push_back(cb); } // 如果不是在本队列中,或是在执行添加的函数对象 // 那么本次添加的就已经不能在本次epoll唤醒的时候调用了 // 因此这里直接wakeup。那么epoll 的wait直接被唤醒了 if (!isInLoopThread() || callingPendingFunctors_) { wakeup(); } } size_t EventLoop::queueSize() const { MutexLockGuard lock(mutex_); return pendingFunctors_.size(); } // 下面三个函数用来添加计时器的 TimerId EventLoop::runAt(const Timestamp &time, const TimerCallback &cb) { return timerQueue_->addTimer(cb, time, 0.0); } TimerId EventLoop::runAfter(double delay, const TimerCallback &cb) { Timestamp time(addTime(Timestamp::now(), delay)); return runAt(time, cb); } TimerId EventLoop::runEvery(double interval, const TimerCallback &cb) { Timestamp time(addTime(Timestamp::now(), interval)); return timerQueue_->addTimer(cb, time, interval); } // 取消计时器。 void EventLoop::cancel(TimerId timerId) { return timerQueue_->cancel(timerId); } // channel 调用的updatechannel,被转发了 void EventLoop::updateChannel(Channel *channel) { assert(channel->ownerLoop() == this); assertInLoopThread(); poller_->updateChannel(channel); } // 同上 void EventLoop::removeChannel(Channel *channel) { assert(channel->ownerLoop() == this); assertInLoopThread(); // 这里使用了 eventHandling_ ,如果是正在处理channel 的事件函数, // 此时又要移除channel,那么要溢出的channel 必须是本channel 或是不在 activechannels中的channel // 否则报错.这里为什么要这样处理? 一般来说都是本channel移除自己. if (eventHandling_) { assert(currentActiveChannel_ == channel || std::find(activeChannels_.begin(), activeChannels_.end(), channel) == activeChannels_.end()); } poller_->removeChannel(channel); } bool EventLoop::hasChannel(Channel *channel) { assert(channel->ownerLoop() == this); assertInLoopThread(); return poller_->hasChannel(channel); } // 当不是在eventloop 的线程执行 eventloop 的相关操作,就会调用该函数.end // 打印日志,然后退出 void EventLoop::abortNotInLoopThread() { LOG_FATAL << "EventLoop::abortNotInLoopThread - EventLoop " << this << " was created in threadId_ = " << threadId_ << ", current thread id = " << CurrentThread::tid(); } // 主动唤醒的函数,向eventfd 中写入数据。 void EventLoop::wakeup() { uint64_t one = 1; ssize_t n = sockets::write(wakeupFd_, &one, sizeof one); if (n != sizeof one) { LOG_ERROR << "EventLoop::wakeup() writes " << n << " bytes instead of 8"; } } // 主动唤醒epoll 后,需要处理eventfd 的读事件,因此读出来就行了 void EventLoop::handleRead() { uint64_t one = 1; ssize_t n = sockets::read(wakeupFd_, &one, sizeof one); if (n != sizeof one) { LOG_ERROR << "EventLoop::handleRead() reads " << n << " bytes instead of 8"; } } // 执行添加到eventloop 的函数对象。 // 这里没有使用拷贝,而是通过交换 vector 的方式。 // 更加效率 void EventLoop::doPendingFunctors() { std::vector<Functor> functors; callingPendingFunctors_ = true; { MutexLockGuard lock(mutex_); functors.swap(pendingFunctors_); } for (size_t i = 0; i < functors.size(); ++i) { functors[i](); } callingPendingFunctors_ = false; } // 打印所有就绪channel 事件的函数 void EventLoop::printActiveChannels() const { for (ChannelList::const_iterator it = activeChannels_.begin(); it != activeChannels_.end(); ++it) { const Channel *ch = *it; LOG_TRACE << "{" << ch->reventsToString() << "} "; } }
2 什么时候需要runinloop()
eventloop所在线程是实际进行IO操作的线程。
虽然,eventloop 被绑定在每一个channel,但是实际上channel 又被关联在各种其他的类中,比如TcpConnection 中,当TcpConnection在别的线程中,需要发送数据的时候,那么会调用send,此时TcpConnection判断运行函数的线程是否是关联的channel绑定的evenloop,如果是,那么直接执行,如果不是,那么runinloop(),通过bind的方式传入。因为send需要传入数据,而在bind中绑定数据即可。
对于这点,后面再添加。
因为涉及到一个什么时候析构的问题。