tornado异步原理

Posted longyunfeigu

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了tornado异步原理相关的知识,希望对你有一定的参考价值。

几个概念

协程代码块: 一个入口点和下一个入口点(或者退出点)中的代码。
协程模块: 由n个入口点代码,和n个协程代码块组成。第一个入口点通常是一个函 数入口点。其组织形式如:函数入口点->协程代码块->入口点->协程代码块…,入口点和代码块相间。
线性模块:一个同步函数的函数体是线性执行的。也就是说一个模块中的每一行代码,相继执行,一个模块在执行中,如果还没有执行完毕,不会去执行其他模块的代码。称这样的代码模块为线性模块。

一个协程模块,如果只含有单一入口点和单一协程代码块(假设这个协程代码块全是同步代码),当然这个协程模块是一个线性执行模块,但是如果含有多个入口点和多个协程代码块,那么就不是一个线性模块。那么执行一个协程模块过程实际是分散的(不同的时间段,执行不同的协程代码块,协程代码块的执行时间段,彼此不相交),但也是顺序的(后一个协程代码块在前一个协程代码块执行结束后才执行)。两个属于同一协程模块的相继协程代码块执行的中间时间间隙,可能有很多其他协程模块的协程代码片段在执行。

生成器和yield语义

在pep255中提到了”simple generator”和”yield语句”(此时还不是”yield表达式”)的实现。一个basic idea,提供一种函数,能够返回中间结果给调用者,然后维护函数的局部状态,以便函数当离开后,也能恢复执行

Generator已经具备协程的一些能力。如:能够暂停执行,保存状态;能够恢复执行;能够异步执行。

但是此时Generator还不是一个协程。一个真正的协程能够控制代码什么时候继续执行。而一个Generator执行遇到一个yield表达式 或者语句,会将执行控制权转移给调用者

在维基百科中提到,可以实现一个顶级的调度子例程,将执行控制权转移回Generator,从而让它继续执行。在tornado中,ioLoop就是这样的顶级调度子例程,每个协程模块通过,函数装饰器coroutine和ioLoop进行通信,从而ioLoop可以在协程模块执行暂停后,在合适的时机重新调度协程模块执行。

Future类

Future类位于tornado源码的concurrent模块中。

class Future(object):
    def done(self):
        return self._done

    def result(self, timeout=None):
        # 获取结果
        self._clear_tb_log()
        if self._result is not None:
            return self._result
        if self._exc_info is not None:
            raise_exc_info(self._exc_info)
        self._check_done()
        return self._result

    def add_done_callback(self, fn):
        # 可以先简单地理解为往_callbacks列表添加东西
        if self._done:
            fn(self)
        else:
            self._callbacks.append(fn)

    def set_result(self, result):
        # 比较重要的方法, 设置上值并依次调用_callbacks列表里的函数
        self._result = result
        self._set_done()

    def _set_done(self):
        # 把 self._done 设置成 True。
        self._done = True
        for cb in self._callbacks:
            try:
                cb(self)
            except Exception:
                app_log.exception('exception calling callback %r for %r',
                                  cb, self)
        self._callbacks = None

Future封装了异步操作的结果, 最终希望它被set_result,并且调用一些回调函数(用来驱动协程继续执行)。Future对象实际是coroutine函数装饰器和IOLoop的沟通使者,有着非常重要的作用。

IOLoop类

tornado框架的底层核心类,位于tornado的ioloop模块。IOLoop主要调度处理的是IO事件(如读,写,错误)。除此之外,还能调度callback和timeout事件。

实例分析

import tornado.ioloop
from tornado.gen import coroutine
from tornado.concurrent import Future

@coroutine
def asyn_sum(a, b):
    print("begin calculate:sum %d+%d"%(a,b))
    future = Future()

    def callback(a, b):
        print("calculating the sum of %d+%d:"%(a,b))
        future.set_result(a+b)
    tornado.ioloop.IOLoop.instance().add_callback(callback, a, b)

    result = yield future

    print("after yielded")
    print("the %d+%d=%d"%(a, b, result))

def main():
    asyn_sum(2, 3)
    tornado.ioloop.IOLoop.instance().start()

main()
def coroutine(func):
    @functools.wraps(func)
    def wrapper(*args, **kwargs):
        # TODO 这里的 future 是什么作用? 和 yielded 的future 是什么区别?
        future = _create_future()
        try:
            # result 就是得到的协程 gen
            result = func(*args, **kwargs)
        ...
        else:
            if isinstance(result, Generator):
                try:
                    # 在上面的实例中 yielded 就是在 asyn_sum 创建的future
                    yielded = next(result)
                ...
                else:
                    runner = Runner(result, future, yielded)
                    future.add_done_callback(lambda _: runner)
                yielded = None
                try:
                    return future
                finally:
                    future = None  # type: ignore
        future_set_result_unless_cancelled(future, result)
        return future
    wrapper.__wrapped__ = func  # type: ignore
    wrapper.__tornado_coroutine__ = True  # type: ignore
    return wrapper
class Runner(object):
    def __init__(
        self,
        gen,
        result_future,
        first_yielded
    ):
        # 包裹了生成器 asyn_sum, 第一次next的返回以及在coroutine创建的future(result_future结果future)
        self.gen = gen
        self.result_future = result_future
        self.future = _null_future  # type: Union[None, Future]
        self.running = False
        self.finished = False
        self.io_loop = IOLoop.current()
        if self.handle_yield(first_yielded):
            gen = result_future = first_yielded = None  # type: ignore
            self.run()
def handle_yield(self, yielded: _Yieldable) -> bool:
    try:
        # 在这里分清 self.future(asyn_sum返回的future对象) 和 self.result_future (是在coroutine里创建的future对象)。
        self.future = convert_yielded(yielded)
    ...
    if self.future is moment:
        self.io_loop.add_callback(self.run)
        return False
    elif self.future is None:
        raise Exception("no pending future")
    # 检查self.future的状态,因为ioloop还没有进行调度
    elif not self.future.done():
        def inner(f: Any) -> None:
            # Break a reference cycle to speed GC.
            f = None  # noqa: F841
            self.run()
        self.io_loop.add_future(self.future, inner)
        return False
    return True
def add_future(
        self,
        future,
        callback
) -> None:
    if isinstance(future, Future):
        # 给asyn_sum的future添加回调函数
        future.add_done_callback(lambda f: self._run_callback(functools.partial(callback, future)))
    else:
        assert is_future(future)
        future_add_done_callback(
            future, lambda f: self.add_callback(callback, future))

一旦 tornado.ioloop.IOLoop.instance().start() 启动事件循环, 那么ioloop添加的回调函数(tornado.ioloop.IOLoop.instance().add_callback(callback, a, b))都会被执行. 此时future.set_result(a+b)之后就会调用future里设置的callback回调函数,也就是里面设置的inner。

def run(self) -> None:
    if self.running or self.finished:
        return
    try:
        self.running = True
        while True:
            # 这里的 future 就是 aync_sum 里返回的future
            future = self.future
            if future is None:
                raise Exception("No pending future")
            if not future.done():
                return
            self.future = None
            try:
                exc_info = None

                try:
                    value = future.result()
                ...
                else:
                    # 继续执行 asyn_sum 生成器
                    yielded = self.gen.send(value)
            ...
            # 下一次寻找yield的过程
            if not self.handle_yield(yielded):
                return
            yielded = None
    finally:
        self.running = False

这里的Runner可以看做是asynico中驱动gen运行的task。

上面的例子说明了ioloop进行callback的调度,在调度过程中某一个future被设置上result,就会进行这个future的callback调度,在future的这些callback中会有一个内置的类似于step的函数,用于驱动生成器gen的运行。

以上是关于tornado异步原理的主要内容,如果未能解决你的问题,请参考以下文章

tornado异步原理--异步事件

Tornado学习之异步原理

Tornado异步非阻塞的使用以及原理

tornado和异步IO的基本简单流程原理

tornado数据库操作怎么异步

Tornado----自定义异步非阻塞Web框架:Snow