Tornado源码阅读(一) --- IOLoop之创建ioloop
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Tornado源码阅读(一) --- IOLoop之创建ioloop相关的知识,希望对你有一定的参考价值。
参考技术A本文的测试环境是在MacOS,因此使用的多路复用的网络IO为kqueue而不是epoll,对应的IOLoop实例对象也是KQueueIOLoop。
在介绍 Epoll模式 的笔记中,最后写了一个tornado的使用epoll的例子。这个例子是如何工作的呢?下面来读一读tornado的源码。
启动一个tornado server很简单,只需要下面几code:
tornado.ioloop.IOLoop.current() 实际上是创建一个IO循环的对象,这里是KQueueIOLoop,Linux的系统则是EPollIOLoop。
下面是current的源码,该方法目的就是从local线程中获取KQueueIOLoop(如果存在的话,否则则新建一个)
程序首先判断 IOLoop._current对象(_current对象是一个线程local)的instance属性,如果没有current,则调用IOLoop.instance()方法创建一个IOLoop的实例作为currnet返回。由于tornado的包装,实际上IOLoop返回的并不是IOLoop的实例对象,而是KQueueIOLoop实例对象。
为什么IOLoop实例化的对象KQueueIOLoop呢?想知道答案就得揭开IOLoop.instance()神秘面纱,表面上看,该方法创建的IOLoop实例对象,并绑定到IOLoop._instance上。
IOLoop继承自Configurable基类,IOLoop 自身没有常见的初始化"构造函数"( init )。显然需要再查看Configurable基类。不看不知道,一看tornado的作者还真会玩。Configurable是一个设计很精巧的类,通过不同子类的继承来适配。基类在子类创建的时候做一些适配的事情。相比 init , new 称之为构造函数更准确。
IOLoop在创建的时候,通过基类 new 方法调用子类的configurable_base和configurable_default适配不同子类的特性。这里通过IOLoop的configurable_default方法选择了unix系统的kqueue方式。
根据平台确定了impl为kqueue之后,将会通过 new 创建实例对象,就是这一步,创建了KQueueIOLoop而不是IOLoop的对象。Configurable自身不定义initialize。这里就调用了KQueueIOLoop的initialize方法。
KQueueIOLoop的方法很简单,其中实现了一个_KQueue,这个类用于操作unix系统上的kqueue的网络io相关封装,例如注册事件,poll调用等。然后KQueueIOLoop带用其父类(PollIOLoop)的initialize方法。有没有发现,调用的控制权一直在各个父类基类中跳转。大概是 IOLoop -> Configurable -> IOLoop -> KQueueIOLoop -> PollOLoop -> IOLoop -> PolIOLoop。
PollIOLoop继承自IOLoop,PollIOLoop调用其父类的initialize方法。此时调用make_current为None,因此又会调用IOLoop.current()的方法,怎么又是IOLoop.current?我们不就是从客户端逻辑(相对于库)调用这个方法进来的么?注意,不同于第一次客户端调用的时候,当时intances是True。也就是此时直接返回IOLoop._current.instance,前面正是因为current为None,才需要通过IOLoop的创建对象。当然此时current为None,即直接返回None。接下来自然运行make_current方法。
make_current方法干点啥好呢?当然你肯定想到了,既然我们之前IOLoop.current方法是为了获取IOLoop._current.instance,并且一直为None,那么make_current正好填补这个空白,创建一个绑定就好嘛。
的确,make_current把当前的类实例(KQueueIOLoop)创建并绑定。通过前面巧妙的设计,根据平台选择了网络io的模式。接下来还得根据io模式绑定IO监听事件。继续阅读PollIOLoop,可以发现通过add_handler方法喝Waker实现。
add_handler方法处理文件描述符,其中stack_context类通过wrap包装一个上下文类似的东西。具体数据结构没有仔细看,留待日后研究,总而言之,这个方法借助之前的_KQueue类注册网络io事件。
此时,ioloop对象成功的创建。创建ioloop对象之后,server还不回启动,需要调用start启动。在启动之前,也需要通过add_hanndler绑定事件函数。至于start的工作原理,下回再研究。
深入tornado中的ioLoop
本文所剖析的tornado源码版本为4.4.2
ioloop是tornado的关键,是他的最底层。
ioloop就是对I/O多路复用的封装,它实现了一个单例,将这个单例保存在IOLoop._instance中
ioloop实现了Reactor模型,将所有要处理的I/O事件注册到一个中心I/O多路复用器上,同时主线程/进程阻塞在多路复用器上;一旦有I/O事件到来或是准备就绪(文件描述符或socket可读、写),多路复用器返回并将事先注册的相应I/O事件分发到对应的处理器中。
另外,ioloop还被用来集中运行回调函数以及集中处理定时任务。
一 准备知识:
1 首先我们要了解Reactor模型
2 其次,我们要了解I/O多路复用,由于本文假设系统为Linux,所以要了解epoll以及Python中的select模块
3 IOLoop类是Configurable类的子类,而Configurable类是一个工厂类,讲解在这。
二 创建IOLoop实例
来看IOLoop,它的父类是Configurable类,也就是说:IOLoop是一个直属配置子类
class IOLoop(Configurable): ......
这里就要结合Configurable类进行讲解:

1 首先实例化一个该直属配置子类的\'执行类对象\',也就是调用该类的configurable_default方法并返回赋值给impl:
@classmethod def configurable_default(cls): if hasattr(select, "epoll"): # 因为我们假设我们的系统为Linux,且支持epoll,所以这里为True from tornado.platform.epoll import EPollIOLoop return EPollIOLoop if hasattr(select, "kqueue"): # Python 2.6+ on BSD or Mac from tornado.platform.kqueue import KQueueIOLoop return KQueueIOLoop from tornado.platform.select import SelectIOLoop return SelectIOLoop
2 也就是impl是EPollIOLoop类对象,然后实例化该对象,运行其initialize方法
class EPollIOLoop(PollIOLoop): # 该类只有这么短短的几句,可见主要的方法是在其父类PollIOLoop中实现。 def initialize(self, **kwargs): super(EPollIOLoop, self).initialize(impl=select.epoll(), **kwargs) # 执行了父类PollIOLoop的initialize方法,并将select.epoll()传入
来看一看PollIOLoop.initialize(EPollIOLoop(),impl=select.epoll())干了些啥:
class PollIOLoop(IOLoop): # 从属配置子类 def initialize(self, impl, time_func=None, **kwargs): super(PollIOLoop, self).initialize(**kwargs) # 调用IOLoop的initialize方法 self._impl = impl # self._impl = select.epoll() if hasattr(self._impl, \'fileno\'): # 文件描述符的close_on_exec属性 set_close_exec(self._impl.fileno()) self.time_func = time_func or time.time self._handlers = {} # 文件描述符对应的fileno()作为key,(文件描述符对象,处理函数)作为value self._events = {} # 用来存储epoll_obj.poll()返回的事件,也就是哪个fd发生了什么事件{(fd1, event1), (fd2, event2)……} self._callbacks = [] self._callback_lock = threading.Lock() # 添加线程锁 self._timeouts = [] # 存储定时任务 self._cancellations = 0 self._running = False self._stopped = False self._closing = False self._thread_ident = None # 获得当前线程标识符 self._blocking_signal_threshold = None self._timeout_counter = itertools.count() # Create a pipe that we send bogus data to when we want to wake # the I/O loop when it is idle self._waker = Waker() self.add_handler(self._waker.fileno(), lambda fd, events: self._waker.consume(), self.READ)
首先调用了IOLoop.initialize(self,**kwargs)方法:
def initialize(self, make_current=None): if make_current is None: if IOLoop.current(instance=False) is None: self.make_current() elif make_current: if IOLoop.current(instance=False) is not None: raise RuntimeError("current IOLoop already exists") self.make_current()
@staticmethod def current(instance=True): current = getattr(IOLoop._current, "instance", None) if current is None and instance: return IOLoop.instance() return current def make_current(self): IOLoop._current.instance = self
我们可以看到IOLoop.initialize()主要是对线程做了一些支持和操作。
3 返回该实例
三 剖析PollIOLoop
1 处理I/O事件以及其对应handler的相关属性以及方法
使用self._handlers用来存储fd与handler的对应关系,文件描述符对应的fileno()作为key,元组(文件描述符对象,处理函数)作为value
self._events 用来存储epoll_obj.poll()返回的事件,也就是哪个fd发生了什么事件{(fd1, event1), (fd2, event2)……}
add_handler方法用来添加handler
update_handle方法用来更新handler
remove_handler方法用来移除handler
def add_handler(self, fd, handler, events): # 向epoll中注册事件 , 并在self._handlers[fd]中为该文件描述符添加相应处理函数 fd, obj = self.split_fd(fd) # fd.fileno(),fd self._handlers[fd] = (obj, stack_context.wrap(handler)) self._impl.register(fd, events | self.ERROR) def update_handler(self, fd, events): fd, obj = self.split_fd(fd) self._impl.modify(fd, events | self.ERROR) def remove_handler(self, fd): fd, obj = self.split_fd(fd) self._handlers.pop(fd, None) self._events.pop(fd, None) try: self._impl.unregister(fd) except Exception: gen_log.debug("Error deleting fd from IOLoop", exc_info=True)
2 处理回调函数的相关属性以及方法
self._callbacks用来存储回调函数
add_callback方法用来直接添加回调函数
add_future方法用来间接的添加回调函数,future对象详解在这
def add_callback(self, callback, *args, **kwargs): # 因为Python的GIL的限制,导致Python线程并不算高效。加上tornado实现了多进程 + 协程的模式,所以我们略过源码中的部分线程相关的一些操作 if self._closing: return self._callbacks.append(functools.partial(stack_context.wrap(callback), *args, **kwargs)) def add_future(self, future, callback): # 为future对象添加经过包装后的回调函数,该回调函数会在future对象被set_done后添加至_callbacks中 assert is_future(future) callback = stack_context.wrap(callback) future.add_done_callback( lambda future: self.add_callback(callback, future))
3 处理定时任务的相关属性以及方法
self._timeouts用来存储定时任务
self.add_timeout用来添加定时任务(self.call_later self.call_at都是间接调用了该方法)
def add_timeout(self, deadline, callback, *args, **kwargs): """ ``deadline``可能是一个数字,表示相对于当前时间的时间(与“IOLoop.time”通常为“time.time”相同的大小),或者是datetime.timedelta对象。 自从Tornado 4.0以来,`call_later`是一个比较方便的替代方案,因为它不需要timedelta对象。 """ if isinstance(deadline, numbers.Real): return self.call_at(deadline, callback, *args, **kwargs) elif isinstance(deadline, datetime.timedelta): return self.call_at(self.time() + timedelta_to_seconds(deadline), callback, *args, **kwargs) else: raise TypeError("Unsupported deadline %r" % deadline)
4 启动io多路复用器
启动也一般就意味着开始循环,那么循环什么呢?
1 运行回调函数
2 运行时间已到的定时任务
3 当某个文件描述法发生事件时,运行该事件对应的handler
使用start方法启动ioloop,看一下其简化版(去除线程相关,以及一些相对不重要的细节):
def start(self): try: while True: callbacks = self._callbacks self._callbacks = [] due_timeouts = [] # 将时间已到的定时任务放置到due_timeouts中,过程省略 for callback in callbacks: # 执行callback self._run_callback(callback) for timeout in due_timeouts: # 执行定时任务 if timeout.callback is not None: self._run_callback(timeout.callback) callbacks = callback = due_timeouts = timeout = None # 释放内存 # 根据情况设置poll_timeout的值,过程省略 if not self._running: # 终止ioloop运行时,在执行完了callback后结束循环 breaktry: event_pairs = self._impl.poll(poll_timeout) except Exception as e: if errno_from_exception(e) == errno.EINTR: # 系统调用被信号处理函数中断,进行下一次循环 continue else: raise self._events.update(event_pairs) while self._events: fd, events = self._events.popitem() # 获取一个fd以及对应事件 try: fd_obj, handler_func = self._handlers[fd] # 获取该fd对应的事件处理函数 handler_func(fd_obj, events) # 运行该事件处理函数 except (OSError, IOError) as e: if errno_from_exception(e) == errno.EPIPE: # 当客户端关闭连接时会产生EPIPE错误 pass # 其他异常处理已经省略 fd_obj = handler_func = None # 释放内存空间

5 关闭io多路复用器
def close(self, all_fds=False): with self._callback_lock: self._closing = True self.remove_handler(self._waker.fileno()) if all_fds: # 该参数若为True,则表示会关闭所有文件描述符 for fd, handler in self._handlers.values(): self.close_fd(fd) self._waker.close() self._impl.close() self._callbacks = None self._timeouts = None
四 参考
https://zhu327.github.io/2016/06/14/tornado%E4%BB%A3%E7%A0%81%E9%98%85%E8%AF%BB%E7%AC%94%E8%AE%B0-ioloop/
https://www.zhihu.com/question/20021164
http://stackoverflow.com/questions/12179271/meaning-of-classmethod-and-staticmethod-for-beginner/12179752#12179752
http://blog.csdn.net/benkaoya/article/details/17262053
以上是关于Tornado源码阅读(一) --- IOLoop之创建ioloop的主要内容,如果未能解决你的问题,请参考以下文章
tornado的IOLoop.instance()方法和IOLoop.current()方法区别
Tornado 高并发源码分析之四--- HTTPServer 与 TCPServer 对象
python Python中单例模式的线程安全实现。基于tornado.ioloop.IOLoop.instance()方法。