Tornado的异步非阻塞

Posted 青山应回首

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Tornado的异步非阻塞相关的知识,希望对你有一定的参考价值。

阻塞和非阻塞Web框架

只有Tornado和Node.js是异步非阻塞的,其他所有的web框架都是阻塞式的。
Tornado阻塞和非阻塞两种模式都支持。

阻塞式:
	代表:Django、Flask、Tornado、Bottle
	一个请求到来未处理完成,后续请求则一直等待。
	解决方案:多线程或多进程。

异步非阻塞(存在IO请求):
	代表:Tornado(默认单进程/单线程)

Tornado的阻塞模式示例

from tornado import ioloop
from tornado.web import RequestHandler,Application

class IndexHandler(RequestHandler):
    def get(self):
        print("开始")
        import time
        time.sleep(10)
        self.write("Hello, world")
        print("结束")

application = Application([
    (r"/index", IndexHandler,{},\'alias_name1\'),
])

if __name__ == "__main__":
    # 单进程
    application.listen(80)
    ioloop.IOLoop.instance().start()

    # 多进程
    # from tornado.httpserver import HTTPServer
    # server = HTTPServer(application)
    # server.bind(8888)
    # server.start(4)  # Forks multiple sub-processes
    # ioloop.IOLoop.current().start()

Tornado 异步非阻塞示例1

注意:这个在window上跑有问题,还是阻塞的,和 time.time()有关?

from tornado import ioloop
from tornado.web import RequestHandler,Application
from tornado import gen
from tornado.concurrent import Future
import time

class IndexHandler(RequestHandler):
    @gen.coroutine
    def get(self):
        print("开始")
        future = Future()
        ioloop.IOLoop.current().add_timeout(time.time() + 10, self.doing)
        yield future

    def doing(self, *args, **kwargs):
        self.write(\'async\')
        self.finish()
        print("结束")

application = Application([
    (r"/index", IndexHandler),
])

if __name__ == "__main__":
    application.listen(8000)
    ioloop.IOLoop.instance().start()

Tornado 异步非阻塞示例2

from tornado import ioloop
from tornado.web import RequestHandler,Application
from tornado import gen
from tornado import httpclient

class IndexHandler(RequestHandler):
    @gen.coroutine
    def get(self):
        print("开始")
        http = httpclient.AsyncHTTPClient()
        yield http.fetch("https://github.com/", self.done)

    def done(self, response):
        print(response)
        self.write(\'执行成功\')
        self.finish()
        print("结束")

application = Application([
    (r"/index", IndexHandler),
])

if __name__ == "__main__":
    # 单进程
    application.listen(9000)
    ioloop.IOLoop.instance().start()

装饰器 + Future 实现Tornado的异步非阻塞示例一

from tornado import ioloop
from tornado.web import RequestHandler,Application
from tornado import gen
from tornado.concurrent import Future

fu = None

class IndexHandler(RequestHandler):
    @gen.coroutine
    def get(self):
        print("I am coming...")
        global fu
        fu = Future()
        fu.add_done_callback(self.done)
        yield fu

    def done(self, response):
        print(response.__dict__[\'_result\'])
        self.write("You made it.")
        self.finish()

class HelpHandler(RequestHandler):
    @gen.coroutine
    def get(self):
        global fu
        if isinstance(fu,Future):
            fu.set_result("Just stick to it.")
            self.write("Gave him a favor.")
        else:
            self.write("He already made it, don\'t worry.")
        self.finish()


application = Application([
    (r"/index", IndexHandler),
    (r"/help", HelpHandler),
])

if __name__ == "__main__":
    application.listen(80)
    ioloop.IOLoop.instance().start()

装饰器 + Future 实现Tornado的异步非阻塞示例二

from tornado import ioloop
from tornado.web import RequestHandler,Application
from tornado import gen
from tornado.concurrent import Future
from threading import Thread
import time

def waiting(future):
    time.sleep(5)
    future.set_result(\'Hey man.\')

class IndexHandler(RequestHandler):
    @gen.coroutine
    def get(self):
        fu = Future()
        fu.add_done_callback(self.done)

        thread = Thread(target=waiting,args=(fu,))
        thread.start()

        yield fu

    def done(self, response):
        print(response.__dict__.get(\'_result\'))
        self.write("See you finally.")
        self.finish()

application = Application([
    (r"/index", IndexHandler),
])

if __name__ == "__main__":
    application.listen(80)
    ioloop.IOLoop.instance().start()

基于异步非阻塞和Tornado-mysql实现用户登录示例  

"""
    需要先安装支持异步操作Mysql的类库: 
    Tornado-MySQL: https://github.com/PyMySQL/Tornado-MySQL#installation
    pip3 install Tornado-MySQL
"""
import tornado.web
from tornado import gen
import tornado_mysql
from tornado_mysql import pools

POOL = pools.Pool(
    dict(host=\'127.0.0.1\', port=3306, user=\'admin\', passwd=\'xxxxxx\', db=\'school\'),
    max_idle_connections=1,
    max_recycle_sec=3)

@gen.coroutine
def get_user_by_conn_pool(user):
    cur = yield POOL.execute("SELECT SLEEP(%s)", (user,))
    row = cur.fetchone()
    raise gen.Return(row)

@gen.coroutine
def get_user(user):
    conn = yield tornado_mysql.connect(host=\'8.8.8.8\', port=3306, user=\'admin\',
                                       passwd=\'xxxxxx\', db=\'school\',charset=\'utf8\')
    cur = conn.cursor()
    # yield cur.execute("SELECT name,age FROM student where name=%s", (user,))
    yield cur.execute("select sleep(10)")
    row = cur.fetchone()
    cur.close()
    conn.close()
    raise gen.Return(row)


class LoginHandler(tornado.web.RequestHandler):
    def get(self, *args, **kwargs):
        self.render(\'login.html\')

    @gen.coroutine
    def post(self, *args, **kwargs):
        user = self.get_argument(\'name\')
        data = yield gen.Task(get_user, user)
        if data:
            print(data)
            self.redirect(\'http://www.baidu.com\')
        else:
            self.render(\'login.html\')

application = tornado.web.Application([
    (r"/login", LoginHandler),
])

if __name__ == "__main__":
    application.listen(8888)
    tornado.ioloop.IOLoop.instance().start()

 

武Sir实现的异步非阻塞的微型Web框架  转载请注明出处

import re
import socket
import select
import time


class HttpResponse(object):
    """
    封装响应信息
    """
    def __init__(self, content=\'\'):
        self.content = content

        self.headers = {}
        self.cookies = {}

    def response(self):
        return bytes(self.content, encoding=\'utf-8\')


class HttpNotFound(HttpResponse):
    """
    404时的错误提示
    """
    def __init__(self):
        super(HttpNotFound, self).__init__(\'404 Not Found\')


class HttpRequest(object):
    """
    用户封装用户请求信息
    """
    def __init__(self, conn):
        self.conn = conn

        self.header_bytes = bytes()
        self.header_dict = {}
        self.body_bytes = bytes()

        self.method = ""
        self.url = ""
        self.protocol = ""

        self.initialize()
        self.initialize_headers()

    def initialize(self):

        header_flag = False
        while True:
            try:
                received = self.conn.recv(8096)
            except Exception as e:
                received = None
            if not received:
                break
            if header_flag:
                self.body_bytes += received
                continue
            temp = received.split(b\'\\r\\n\\r\\n\', 1)
            if len(temp) == 1:
                self.header_bytes += temp
            else:
                h, b = temp
                self.header_bytes += h
                self.body_bytes += b
                header_flag = True

    @property
    def header_str(self):
        return str(self.header_bytes, encoding=\'utf-8\')

    def initialize_headers(self):
        headers = self.header_str.split(\'\\r\\n\')
        first_line = headers[0].split(\' \')
        if len(first_line) == 3:
            self.method, self.url, self.protocol = headers[0].split(\' \')
            for line in headers:
                kv = line.split(\':\')
                if len(kv) == 2:
                    k, v = kv
                    self.header_dict[k] = v


class Future(object):
    """
    异步非阻塞模式时封装回调函数以及是否准备就绪
    """
    def __init__(self, callback):
        self.callback = callback
        self._ready = False
        self.value = None

    def set_result(self, value=None):
        self.value = value
        self._ready = True

    @property
    def ready(self):
        return self._ready


class TimeoutFuture(Future):
    """
    异步非阻塞超时
    """
    def __init__(self, timeout):
        super(TimeoutFuture, self).__init__(callback=None)
        self.timeout = timeout
        self.start_time = time.time()

    @property
    def ready(self):
        current_time = time.time()
        if current_time > self.start_time + self.timeout:
            self._ready = True
        return self._ready


class Snow(object):
    """
    微型Web框架类
    """
    def __init__(self, routes):
        self.routes = routes
        self.inputs = set()
        self.request = None
        self.async_request_handler = {}

    def run(self, host=\'localhost\', port=9999):
        """
        事件循环
        :param host:
        :param port:
        :return:
        """
        sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
        sock.bind((host, port,))
        sock.setblocking(False)
        sock.listen(128)
        sock.setblocking(0)
        self.inputs.add(sock)
        try:
            while True:
                readable_list, writeable_list, error_list = select.select(self.inputs, [], self.inputs,0.005)
                for conn in readable_list:
                    if sock == conn:
                        client, address = conn.accept()
                        client.setblocking(False)
                        self.inputs.add(client)
                    else:
                        gen = self.process(conn)
                        if isinstance(gen, HttpResponse):
                            conn.sendall(gen.response())
                            self.inputs.remove(conn)
                            conn.close()
                        else:
                            yielded = next(gen)
                            self.async_request_handler[conn] = yielded
                self.polling_callback()

        except Exception as e:
            pass
        finally:
            sock.close()

    def polling_callback(self):
        """
        遍历触发异步非阻塞的回调函数
        :return:
        """
        for conn in list(self.async_request_handler.keys()):
            yielded = self.async_request_handler[conn]
            if not yielded.ready:
                continue
            if yielded.callback:
                ret = yielded.callback(self.request, yielded)
                conn.sendall(ret.response())
            self.inputs.remove(conn)
            del self.async_request_handler[conn]
            conn.close()

    def process(self, conn):
        """
        处理路由系统以及执行函数
        :param conn:
        :return:
        """
        self.request = HttpRequest(conn)
        func = None
        for route in self.routes:
            if re.match(route[0], self.request.url):
                func = route[1]
                break
        if not func:
            return HttpNotFound()
        else:
            return func(self.request)

阻塞模式:基本使用

from snow import Snow
from snow import HttpResponse
 
 
def index(request):
    return HttpResponse(\'OK\')
 
 
routes = [
    (r\'/index/\', index),
]
 
app = Snow(routes)
app.run(port=8012)

异步非阻塞:超时

from snow import Snow
from snow import HttpResponse
from snow import TimeoutFuture
 
request_list = []
 
 
def async(request):
    obj = TimeoutFuture(5)
    yield obj
 
 
def home(request):
    return HttpResponse(\'home\')
 
 
routes = [
    (r\'/home/\', home),
    (r\'/async/\', async),
]
 
app = Snow(routes)
app.run(port=8012)

异步非阻塞:等待

from snow import Snow
from snow import HttpResponse
from snow import Future

request_list = []


def callback(request, future):
    return HttpResponse(future.value)


def req(request):
    obj = Future(callback=callback)
    request_list.append(obj)
    yield obj


def stop(request):
    obj = request_list[0]
    del request_list[0]
    obj.set_result(\'done\')
    return HttpResponse(\'stop\')


routes = [
    (r\'/req/\', req),
    (r\'/stop/\', stop),
]

app = Snow(routes)
app.run(port=8012)

以上是关于Tornado的异步非阻塞的主要内容,如果未能解决你的问题,请参考以下文章

Python web框架 Tornado异步非阻塞

Tornado 异步非阻塞

python---tornado补充(异步非阻塞)

初始Tornado异步非阻塞

Tornado异步非阻塞的使用以及原理

利用tornado使请求实现异步非阻塞