深入tornado中的协程
Posted MnCu
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了深入tornado中的协程相关的知识,希望对你有一定的参考价值。
tornado使用了单进程(当然也可以多进程) + 协程 + I/O多路复用的机制,解决了C10K中因为过多的线程(进程)的上下文切换 而导致的cpu资源的浪费。
tornado中的I/O多路复用前面已经讲过了。本文不做详细解释。
来看一下tornado中的协程模块:tornado.gen:
tornado.gen是根据生成器(generator)实现的,用来更加简单的实现异步。
先来说一下tornado.gen.coroutine的实现思路:
我们知道generator中的yield语句可以使函数暂停执行,而send()方法则可以恢复函数的执行。
tornado将那些异步操作放置到yield语句后,当这些异步操作完成后,tornado会将结果send()至generator中恢复函数执行。
在tornado的官方文档中有这么一句话:
Most asynchronous functions in Tornado return a Future; yielding this object returns its result.
就是说:
在tornado中大多数的异步操作返回一个Future对象
yield Future对象 会返回该异步操作的结果,这句话的意思就是说 假如 ret = yield some_future_obj 当some_future_obj所对应的异步操作完成后会自动的将该异步操作的结果赋值给 ret
那么,Future对象到底是什么?
一 Future对象
先来说说Future对象:
Future对象可以概括为: 一个异步操作的占位符,当然这个占位符有些特殊,它特殊在:
1 这个占位符是一个对象
2 这个对象包含了很多属性,包括_result 以及 _callbacks,分别用来存储异步操作的结果以及回调函数
3 这个对象包含了很多方法,比如添加回调函数,设置异步操作结果等。
4 当这个对象对应的异步操作完成后,该对象会被set_done,然后遍历并运行_callbacks中的回调函数
来看一下Future的简化版:
class Future(object): \'\'\' Future对象主要保存一个回调函数列表_callbacks与一个执行结果_result,当我们set_result时,就会执行_callbacks中的函数 如果set_result或者set_done,就会遍历_callbacks列表并执行callback(self)函数 \'\'\' def __init__(self): self._result = None # 执行的结果 self._callbacks = [] # 用来保存该future对象的回调函数 def result(self, timeout=None): # 如果操作成功,返回结果。如果失败则抛出异常 self._clear_tb_log() if self._result is not None: return self._result if self._exc_info is not None: raise_exc_info(self._exc_info) self._check_done() return self._result def add_done_callback(self, fn): if self._done: fn(self) else: self._callbacks.append(fn) def set_result(self, result): self._result = result self._set_done() def _set_done(self): # 执行结束(成功)后的操作。 self._done = True for cb in self._callbacks: try: cb(self) except Exception: app_log.exception(\'Exception in callback %r for %r\', cb, self) self._callbacks = None
完整源码:
class Future(object): \'\'\' Future对象主要保存一个回调函数列表_callbacks与一个执行结果_result,当我们set_result时,就会执行_callbacks中的函数 \'\'\' def __init__(self): self._done = False # 是否执行完成 self._result = None # 执行的结果 self._exc_info = None # 执行的异常信息 self._log_traceback = False # Used for Python >= 3.4 self._tb_logger = None # Used for Python <= 3.3 self._callbacks = [] # 用来保存该future对象的回调函数 # Implement the Python 3.5 Awaitable protocol if possible # (we can\'t use return and yield together until py33). if sys.version_info >= (3, 3): exec(textwrap.dedent(""" def __await__(self): return (yield self) """)) else: # Py2-compatible version for use with cython. def __await__(self): result = yield self # StopIteration doesn\'t take args before py33, # but Cython recognizes the args tuple. e = StopIteration() e.args = (result,) raise e def cancel(self): """Cancel the operation, if possible. 如果可能的话取消操作 tornado对象不支持取消操作,所以总是返回False """ return False def cancelled(self): # 同上 return False def running(self): """Returns True if this operation is currently running.""" return not self._done def done(self): """Returns True if the future has finished running.""" return self._done def _clear_tb_log(self): self._log_traceback = False if self._tb_logger is not None: self._tb_logger.clear() self._tb_logger = None def result(self, timeout=None): """If the operation succeeded, return its result. If it failed, re-raise its exception. 如果操作成功,返回结果。如果失败则抛出异常 This method takes a ``timeout`` argument for compatibility with `concurrent.futures.Future` but it is an error to call it before the `Future` is done, so the ``timeout`` is never used. """ self._clear_tb_log() if self._result is not None: return self._result if self._exc_info is not None: raise_exc_info(self._exc_info) self._check_done() return self._result def exception(self, timeout=None): """If the operation raised an exception, return the `Exception` object. Otherwise returns None. This method takes a ``timeout`` argument for compatibility with `concurrent.futures.Future` but it is an error to call it before the `Future` is done, so the ``timeout`` is never used. """ self._clear_tb_log() if self._exc_info is not None: return self._exc_info[1] else: self._check_done() return None def add_done_callback(self, fn): """Attaches the given callback to the `Future`. 将callback附加到 It will be invoked with the `Future` as its argument when the Future has finished running and its result is available. In Tornado consider using `.IOLoop.add_future` instead of calling `add_done_callback` directly. """ if self._done: fn(self) else: self._callbacks.append(fn) def set_result(self, result): """Sets the result of a ``Future``. 将 result 设置为该future对象的结果 It is undefined to call any of the ``set`` methods more than once on the same object. """ self._result = result self._set_done() def set_exception(self, exception): """Sets the exception of a ``Future.``""" self.set_exc_info( (exception.__class__, exception, getattr(exception, \'__traceback__\', None))) def exc_info(self): """Returns a tuple in the same format as `sys.exc_info` or None. .. versionadded:: 4.0 """ self._clear_tb_log() return self._exc_info def set_exc_info(self, exc_info): """Sets the exception information of a ``Future.`` Preserves tracebacks on Python 2. .. versionadded:: 4.0 """ self._exc_info = exc_info self._log_traceback = True if not _GC_CYCLE_FINALIZERS: self._tb_logger = _TracebackLogger(exc_info) try: self._set_done() finally: # Activate the logger after all callbacks have had a # chance to call result() or exception(). if self._log_traceback and self._tb_logger is not None: self._tb_logger.activate() self._exc_info = exc_info def _check_done(self): if not self._done: raise Exception("DummyFuture does not support blocking for results") def _set_done(self): # 执行结束(成功)后的操作。 self._done = True for cb in self._callbacks: try: cb(self) except Exception: app_log.exception(\'Exception in callback %r for %r\', cb, self) self._callbacks = None # On Python 3.3 or older, objects with a destructor part of a reference # cycle are never destroyed. It\'s no longer the case on Python 3.4 thanks to # the PEP 442. if _GC_CYCLE_FINALIZERS: def __del__(self): if not self._log_traceback: # set_exception() was not called, or result() or exception() # has consumed the exception return tb = traceback.format_exception(*self._exc_info) app_log.error(\'Future %r exception was never retrieved: %s\', self, \'\'.join(tb).rstrip())
二 gen.coroutine装饰器
tornado中的协程是通过tornado.gen中的coroutine装饰器实现的:
def coroutine(func, replace_callback=True): return _make_coroutine_wrapper(func, replace_callback=True)
_make_coroutine_wrapper :
def _make_coroutine_wrapper(func, replace_callback): @functools.wraps(func) def wrapper(*args, **kwargs): \'\'\' 大体过程: future = TracebackFuture() result = func(*args, **kwargs) if isinstance(result, GeneratorType): yielded = next(result) Runner(result, future, yielded) return future \'\'\' future = TracebackFuture() # TracebackFuture = Future if replace_callback and \'callback\' in kwargs: callback = kwargs.pop(\'callback\') IOLoop.current().add_future(future, lambda future: callback(future.result())) try: result = func(*args, **kwargs) # 执行func,若func中包含yield,则返回一个generator对象 except (Return, StopIteration) as e: result = _value_from_stopiteration(e) except Exception: future.set_exc_info(sys.exc_info()) return future else: if isinstance(result, GeneratorType): # 判断其是否为generator对象 try: orig_stack_contexts = stack_context._state.contexts yielded = next(result) # 第一次执行 if stack_context._state.contexts is not orig_stack_contexts: yielded = TracebackFuture() yielded.set_exception( stack_context.StackContextInconsistentError( \'stack_context inconsistency (probably caused \' \'by yield within a "with StackContext" block)\')) except (StopIteration, Return) as e: future.set_result(_value_from_stopiteration(e)) except Exception: future.set_exc_info(sys.exc_info()) else: Runner(result, future, yielded) # Runner(result, future, yield) try: return future finally: future = None future.set_result(result) return future return wrapper
先来看一下大体过程:
1 首先生成一个Future对象
2 运行该被装饰函数并将结果赋值给result。 在这里因为tornado的\'异步\'实现是基于generator的,所以一般情况下 result是一个generator对象
3 yielded = next(result) 执行到被装饰函数的第一次yield,将结果赋值给yielded。一般情况下,yielded很大情况下是一个Future对象。
4 Runner(result, future, yielded)
5 return future
除了第4步以外其他都很好理解,所以来了解一下第四步Runner()干了些啥:
三 Runner()类
1 为什么要有Runner()?或者说Runner()的作用是什么?
Runner()可以自动的将异步操作的结果send()至生成器中止的地方
tornado的协程或者说异步是基于generator实现的,generator较为常用的有两个方法:send() next() ,关于这两个方法的流程分析在这。
很多情况下会有generator的嵌套。比如说经常会yield 一个generator。当A生成器yield B生成器时,分两步:
1 我们首先中止A的执行转而执行B
2 当B执行完成后,我们需要将B的结果send()至A中止的地方,继续执行A
Runner()主要就是来做这些的,也就是控制生成器的执行与中止,并在合适的情况下使用send()方法同时传入B生成器的结果唤醒A生成器。
来看一个简单例子:
def run(): print(\'start running\') yield 2 # 跑步用时2小时 def eat(): print(\'start eating\') yield 1 # 吃饭用时1小时 def time(): run_time = yield run() eat_time = yield eat() print(run_time+eat_time) def Runner(gen): r = next(gen) return r t = time() try: action = t.send(Runner(next(t))) t.send(Runner(action)) except StopIteration: pass
上例中的Runner()仅仅完成了第一步,我们还需要手动的执行第二步,而tornado的gen的Runner()则做了全套奥!
2 剖析Runner()
在Runner()中主要有三个方法__init__ handle_yield run:
class Runner(object): def __init__(self, gen, result_future, first_yielded): self.gen = gen # 一个generator对象 self.result_future = result_future # 一个Future对象 self.future = _null_future # 一个刚初始化的Future对象 _null_future = Future(); _null_future.set_result(None) self.yield_point = None self.pending_callbacks = None self.results = None self.running = False self.finished = False self.had_exception = False self.io_loop = IOLoop.current() self.stack_context_deactivate = None if self.handle_yield(first_yielded): self.run() ………… 部分方法省略 def run(self): """Starts or resumes the generator, running until it reaches a yield point that is not ready. """ if self.running or self.finished: return try: self.running = True while True: future = self.future if not future.done(): return self.future = None try: orig_stack_contexts = stack_context._state.contexts exc_info = None try: value = future.result() except Exception: self.had_exception = True exc_info = sys.exc_info() if exc_info is not None: yielded = self.gen.throw(*exc_info) exc_info = None else: yielded = self.gen.send(value) if stack_context._state.contexts is not orig_stack_contexts: self.gen.throw( stack_context.StackContextInconsistentError( \'stack_context inconsistency (probably caused \' \'by yield within a "with StackContext" block)\')) except (StopIteration, Return) as e: self.finished = True self.future = _null_future if self.pending_callbacks and not self.had_exception: # If we ran cleanly without waiting on all callbacks # raise an error (really more of a warning). If we # had an exception then some callbacks may have been # orphaned, so skip the check in that case. raise LeakedCallbackError( "finished without waiting for callbacks %r" % self.pending_callbacks) self.result_future.set_result(_value_from_stopiteration(e)) self.result_future = None self._deactivate_stack_context() return except Exception: self.finished = True self.future = _null_future self.result_future.set_exc_info(sys.exc_info()) self.result_future = None self._deactivate_stack_context() return if not self.handle_yield(yielded): return finally: self.running = False def handle_yield(self, yielded): if _contains_yieldpoint(yielded): # 检查其中是否包含YieldPoint yielded = multi(yielded) if isinstance(yielded, YieldPoint): # Base class for objects that may be yielded from the generator self.future = TracebackFuture() # 一个刚刚初始化的Future对象 def start_yield_point(): try: yielded.start(self) if yielded.is_ready(): self.future.set_result(yielded.get_result()) else: self.yield_point = yielded except Exception: self.future = TracebackFuture() self.future.set_exc_info(sys.exc_info()) if self.stack_context_deactivate is None: with stack_context.ExceptionStackContext(self.handle_exception) as deactivate: self.stack_context_deactivate = deactivate def cb(): start_yield_point() self.run() self.io_loop.add_callback(cb) return False else: start_yield_point() else: try: self.future = convert_yielded(yielded) except BadYieldError: self.future = TracebackFuture() self.future.set_exc_info(sys.exc_info()) if not self.future.done() or self.future is moment: # moment = Future() self.io_loop.add_future(self.future, lambda f: self.run()) # 为该future添加callback return False return True
2.1 __init__方法
__init__ 里面执行了一些初始化的操作,最主要是最后两句:
if self.handle_yield(first_yielded): # 运行 self.run()
2.2 handle_yield方法
handle_yield(self, yielded) 函数,这个函数顾名思义,就是用来处理yield返回的对象的。
首先我们假设yielded是一个Future对象(因为这是最常用的情况),这样的话代码就缩减了很多
def handle_yield(self, yielded): self.future = convert_yielded(yielded) # 如果yielded是Future对象则原样返回 if not self.future.done() or self.future is moment: # moment是tornado初始化时就建立的一个Future对象,且被set_result(None) self.io_loop.add_future(self.future, lambda f: self.run()) # 为该future添加callback return False return True
也就是干了三步:
首先解析出self.future
然后判断self.future对象是否已经被done(完成),如果没有的话为其添加回调函数,这个回调函数会执行self.run()
返回self.future对象是否被done
总体来说,handle_yield返回yielded对象是否被set_done,如果没有则为yielded对象添加回调函数,这个回调函数执行self.run()
还有一个有趣的地方,就是上面代码的第四行: self.io_loop.add_future(self.future, lambda f: self.run())
def add_future(self, future, callback): # 为future添加一个回调函数,这个回调函数的作用是:将参数callback添加至self._callbacks中 # 大家思考一个问题: 如果某个Future对象被set_done,那么他的回调函数应该在什么时候执行? # 是立即执行亦或者是将回调函数添加到IOLoop实例的_callbacks中进行统一执行? # 虽然前者更简单,但导致回调函数的执行过于混乱,我们应该让所有满足执行条件的回调函数统一执行。显然后者更合理 # 而add_future()的作用就是这样 future.add_done_callback(lambda future: self.add_callback(callback, future)) def add_callback(self, callback, *args, **kwargs): # 将callback添加至_callbacks列表中 self._callbacks.append(functools.partial(callback, *args, **kwargs))
2.3 run方法
再来看self.run()方法。这个方法实际上就是一个循环,不停的执行generator的send()方法,发送的值就是yielded的result。
我们可以将run()方法简化一下:
def run(self): """Starts or resumes the generator, running until it reaches a yield point that is not ready. 循环向generator中传递值,直到某个yield返回的yielded还没有被done """ try: self.running = True while True: future = self.future if not future.done(): return self.future = None # 清空self.future value = future.result() # 获取future对象的结果 try: yielded = self.gen.send(value) # send该结果,并将self.gen返回的值赋值给yielded(一般情况下这也是个future对象) except (StopIteration, Return) as e: self.finished = True self.future = _null_future self.result_future.set_result(_value_from_stopiteration(e)) self.result_future = None self._deactivate_stack_context() return if not self.handle_yield(yielded): # 运行self.handler_yield(yielded),如果yielded对象没有被done,则直接返回;否则继续循环 return finally: self.running = False
总结:
1 每一个Future对应一个异步操作
2 该Future对象可以添加回调函数,当该异步操作完成后,需要对该Future对象设置set_done或者set_result,然后执行其所有的回调函数
3 凡是使用了coroutine装饰器的generator函数都会返回一个Future对象,同时会不断为该generator,该generator每一次运行send()或者next()的返回结果yielded以及future对象运行Runner()
4 Runner()会对generator不断进行send()或者next()操作。具体步骤是:上一个next()或者send()操作返回的yielded(一般是一个Future对象)被set_done后,将该yielded对象的结果send()至generator中,不断循环该操作,直到产生StopIteration或者Return异常(这表示该generator执行结束),这时会为该generator对应的Future对象set_result。
我们可以看到tornado的协程是基于generator的,generator可以通过yield关键字暂停执行,也可以通过next()或者send()恢复执行,同时send()可以向generator中传递值。
而将协程连接起来的纽带则是Future对象,每一个Future对象都对应着一个异步操作,我们可以为该对象添加许多回调函数,当异步操作完成后通过对Future对象进行set_done或者set_result就可以执行相关的回调函数。
提供动力的则是Runner(),他不停的将generator所yield的每一个future对象的结果send()至generator,当generator运行结束,他会进行最后的包装工作,对该generator所对应的Future对象执行set_result操作。
参考:
http://blog.csdn.net/wyx819/article/details/45420017
http://www.cnblogs.com/apexchu/p/4226784.html
以上是关于深入tornado中的协程的主要内容,如果未能解决你的问题,请参考以下文章