zmq笔记二: io线程和poller_t
Posted wqchen@
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了zmq笔记二: io线程和poller_t相关的知识,希望对你有一定的参考价值。
int major, minor, patch;
zmq_version(&major, &minor, &patch); //4.2.0
本文主要是分析代码,方便自己日后查阅.
=========================================
在上一篇中讲到io_thread_t的线程循环函数实际上调用的,是根据不同平台下的首选I/O多路复用(select_t/poll_t/epoll_t/kqueue_t)的成员函数loop().
怎样确定选用哪种I/O多路复用,由一些预编译宏确定,请看poller.hpp头文件.
本文是在windows平台下进行分析. windows下选用的是select_t,并不是iocp.
1. I/O线程
io_thread_t有三个成员变量:
// I/O thread accesses incoming commands via this mailbox. mailbox_t mailbox; //接收命令消息的邮箱,mailbox相关资料会在后面的文章展开介绍. 当需要和io_thread_t通信时,给它的邮箱发一个command_t命令 // Handle associated with mailbox\' file descriptor. poller_t::handle_t mailbox_handle; //与邮箱绑定的句柄 // I/O multiplexing is performed using a poller object. poller_t *poller; //选用的i/o多路复用
io_thread_t这个类的功能很简洁,主要操作有: 线程开启,线程结束,处理在mailbox里的命令队列的消息(in_events函数).而mailbox里有消息待处理,是通过mailbox的fd状态可读来进行通知的,这个fd就是io_thread_t的成员变量mailbox_handle.
zmq::io_thread_t::io_thread_t (ctx_t *ctx_, uint32_t tid_) : object_t (ctx_, tid_) { poller = new (std::nothrow) poller_t (*ctx_); alloc_assert (poller); mailbox_handle = poller->add_fd (mailbox.get_fd (), this); poller->set_pollin (mailbox_handle);//初始化时加进去poller_t的可读fd集合了. }
2. poller_t
poller_t实际上是一个typedef的类型:
typedef select_t poller_t;
typedef epoll_t poller_t;
...
它根据不同平台下的首选I/O多路复用(select_t/poll_t/epoll_t/kqueue_t).本文只分析select_t.
就select_t而言,在windows和linux平台下,套接字集合的管理有所不同,但原理也差不多. 在select_t里有两个平台无关的通用的结构体:
// Internal state. struct fds_set_t //select_t对感兴趣的fd的事件集合管理 { fds_set_t (); fds_set_t (const fds_set_t& other_); fds_set_t& operator=(const fds_set_t& other_); // Convinient method to descriptor from all sets. void remove_fd (const fd_t& fd_); fd_set read; fd_set write; fd_set error; }; struct fd_entry_t //与fd对应的事件处理对象(可理解成实现了i_poll_events相关接口并需要在io线程处理事件的对象) { fd_t fd; zmq::i_poll_events* events; }; typedef std::vector<fd_entry_t> fd_entries_t;
#if defined ZMQ_HAVE_WINDOWS ...... #else fd_entries_t fd_entries; //fd对应的事件对象集合 fds_set_t fds_set; //select系统调用需要的所有感兴趣的fd集合 fd_t maxfd; //select系统调用需要的最大fd值 bool retired; //是否需要移除fd对应的事件对象,如果为true,则从fd_entries删除fd所对应的事件对象 #endif
poller_t还有一个thread_t成员变量worker,它才是系统线程的包装 (io_thread_t.poller->worker).worker线程开启后,实际执行的就是poller_t:loop()函数.
// Handle of the physical thread doing the I/O work. thread_t worker;
poller_t对某些事件对象(实现了i_poll_events:in_events接口)感兴趣,就以fd为key,加进去集合fd_entries_t.同时把fd加到fds_set.error集合里,监听fd的错误事件.
zmq::select_t::handle_t zmq::select_t::add_fd (fd_t fd_, i_poll_events *events_) { fd_entry_t fd_entry; fd_entry.fd = fd_; fd_entry.events = events_; #if defined ZMQ_HAVE_WINDOWS ...... #else fd_entries.push_back (fd_entry); FD_SET (fd_, &fds_set.error); if (fd_ > maxfd) maxfd = fd_; #endif adjust_load (1); //fd数量调整,原子增减操作 return fd_; }
需要注意的是,add_fd并没有把fd放进fds_set.read和fds_set.write,也就是说add_fd加进去的fd并不能被select监听到读写事件.
但是删除fd时,会把fd相关的信息都从poller_t里移除掉.
void zmq::select_t::rm_fd (handle_t handle_) { #if defined ZMQ_HAVE_WINDOWS ...... #else fd_entries_t::iterator fd_entry_it; for (fd_entry_it = fd_entries.begin (); fd_entry_it != fd_entries.end (); ++fd_entry_it) if (fd_entry_it->fd == handle_) //遍历集合找到目标元素 break; zmq_assert (fd_entry_it != fd_entries.end ()); fd_entry_it->fd = retired_fd; //标记设置为移除,注意找到目标元素后并没有立即从vector里remove掉,而是标记retired为true,在select系统调用完成后统一移除. fds_set.remove_fd (handle_); //从select集合里去掉 if (handle_ == maxfd) { //更新最大的fd值 maxfd = retired_fd; for (fd_entry_it = fd_entries.begin (); fd_entry_it != fd_entries.end (); ++fd_entry_it) if (fd_entry_it->fd > maxfd) maxfd = fd_entry_it->fd; } retired = true; //标记为需要删除 #endif adjust_load (-1); //fd数量调整 }
poller_t对fd的读写监听是通过这几个函数来操作的:
void set_pollin (handle_t handle_); //监听fd可读状态 void reset_pollin (handle_t handle_); //移除fd监听可读 void set_pollout (handle_t handle_); //监听fd可写状态 void reset_pollout (handle_t handle_); //移除fd监听可写
poller_t继承自poller_base_t,含有定时器集合:
// Clock instance private to this I/O thread. clock_t clock; // List of active timers. struct timer_info_t { zmq::i_poll_events *sink; int id; }; typedef std::multimap <uint64_t, timer_info_t> timers_t; timers_t timers;
定时器集合timers_t是用std:multimap容器,能保证timer的重复键值并有序.处理timer事件也很简洁,从最小时间值的元素开始与当前时间戳比较一下,大于当前时间就是timer时间到来,此时执行timer事件处理,处理完后从定时器集合移除.
uint64_t zmq::poller_base_t::execute_timers () { // Fast track. if (timers.empty ()) return 0; // Get the current time. uint64_t current = clock.now_ms (); // Execute the timers that are already due. timers_t::iterator it = timers.begin (); while (it != timers.end ()) { if (it->first > current) return it->first - current; // Trigger the timer. it->second.sink->timer_event (it->second.id); // Remove it from the list of active timers. timers_t::iterator o = it; ++it; timers.erase (o); } // There are no more timers. return 0; }
3. I/O线程的循环函数
循环里做了三件事情:
1.执行已注册的定时器
2.对fds_set的read/write/error的fd集合进行select,并处理各个fd发生的事件.
3.从事件集合fd_entries里移除已经标记为retired_fd的事件对象.
void zmq::select_t::loop () { while (!stopping) { // Execute any due timers. int timeout = (int) execute_timers (); int rc = 0; #if defined ZMQ_HAVE_WINDOWS ...... #else fds_set_t local_fds_set = fds_set; rc = select (maxfd + 1, &local_fds_set.read, &local_fds_set.write, &local_fds_set.error, timeout ? &tv : NULL); if (rc == -1) { errno_assert (errno == EINTR); continue; } // Size is cached to avoid iteration through just added descriptors. for (fd_entries_t::size_type i = 0, size = fd_entries.size (); i < size && rc > 0; ++i) { fd_entry_t& fd_entry = fd_entries [i]; ...... if (FD_ISSET (fd_entry.fd, &local_fds_set.read)) { fd_entry.events->in_event (); --rc; } ...... if (FD_ISSET (fd_entry.fd, &local_fds_set.write)) { fd_entry.events->out_event (); --rc; }
...... if (FD_ISSET (fd_entry.fd, &local_fds_set.error)) { fd_entry.events->in_event (); --rc; } } if (retired) { //等待select返回并处理完fd的事件后,再统一从fd_entries集合里移除标记为retired_fd的元素 retired = false; fd_entries.erase (std::remove_if (fd_entries.begin (), fd_entries.end (), is_retired_fd), fd_entries.end ()); } #endif } }
以上是关于zmq笔记二: io线程和poller_t的主要内容,如果未能解决你的问题,请参考以下文章