python---tornado补充(异步非阻塞)

Posted 山上有风景

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python---tornado补充(异步非阻塞)相关的知识,希望对你有一定的参考价值。

一:正常访问(同一线程中多个请求是同步阻塞状态)

import tornado.ioloop
import tornado.web
import tornado.websocket
import datetime,time

class MainHandler(tornado.web.RequestHandler):
    def get(self):
        self.write("main")class IndexHandler(tornado.web.RequestHandler):
    def get(self):
        time.sleep(10)
        self.write("index")

st ={
    "template_path": "template",#模板路径配置
    "static_path":\'static\',
}

#路由映射   匹配执行,否则404
application = tornado.web.Application([
    ("/main",MainHandler),
    ("/index",IndexHandler),
],**st)

if __name__=="__main__":
    application.listen(8080)

    #io多路复用
    tornado.ioloop.IOLoop.instance().start()

我们先访问index,再去访问main,查看情况

二:使用future模块,实现异步非阻塞

import tornado.ioloop
import tornado.web
import tornado.websocket
import time
from tornado.concurrent import Future

class MainHandler(tornado.web.RequestHandler):
    def get(self):

        self.write("main")class IndexHandler(tornado.web.RequestHandler):
    def get(self):
        future = Future()
        tornado.ioloop.IOLoop.current().add_timeout(time.time()+5,self.done)    #会在结束后为future中result赋值
        yield future

    def done(self,*args,**kwargs):
        self.write("index")
        self.finish()  #关闭请求连接,必须在回调中完成

st ={
    "template_path": "template",#模板路径配置
    "static_path":\'static\',
}

#路由映射   匹配执行,否则404
application = tornado.web.Application([
    ("/main",MainHandler),
    ("/index",IndexHandler),
],**st)

if __name__=="__main__":
    application.listen(8080)

    #io多路复用
    tornado.ioloop.IOLoop.instance().start()

三:在tornado中使用异步IO请求模块

import tornado.ioloop
import tornado.web
import tornado.websocket
import time
from tornado.concurrent import Future
from tornado import httpclient
from tornado import gen

class MainHandler(tornado.web.RequestHandler):
    def get(self):

        self.write("main")

    def post(self, *args, **kwargs):
        pass

class IndexHandler(tornado.web.RequestHandler):
    @gen.coroutine
    def get(self):
        http = httpclient.AsyncHTTPClient()
        yield http.fetch("http://www.google.com",self.done)

    def done(self):
        self.write("index")
        self.finish()

    def post(self, *args, **kwargs):
        pass

st ={
    "template_path": "template",#模板路径配置
    "static_path":\'static\',
}

#路由映射   匹配执行,否则404
application = tornado.web.Application([
    ("/main",MainHandler),
    ("/index",IndexHandler),
],**st)

if __name__=="__main__":
    application.listen(8080)

    #io多路复用
    tornado.ioloop.IOLoop.instance().start()

四:请求间交互,使用future

import tornado.ioloop
import tornado.web
import tornado.websocket
from tornado.concurrent import Future
from tornado import gen

class MainHandler(tornado.web.RequestHandler):
    def get(self):
        self.write("main")


class IndexHandler(tornado.web.RequestHandler):
    @gen.coroutine
    def get(self):
        future = Future()
        future.add_done_callback(self.done)
        yield future    #由于future中的result中值一直未被赋值,所有客户端一直等待

    def done(self,*args,**kwargs):
        self.write("index")
        self.finish()

st ={
    "template_path": "template",#模板路径配置
    "static_path":\'static\',
}

#路由映射   匹配执行,否则404
application = tornado.web.Application([
    ("/main",MainHandler),
    ("/index",IndexHandler),
],**st)

if __name__=="__main__":
    application.listen(8080)

    #io多路复用
    tornado.ioloop.IOLoop.instance().start()

 

 我们可以在另一个请求中去为这个future中result赋值,使当前请求返回

 

import tornado.ioloop
import tornado.web
import tornado.websocket
from tornado.concurrent import Future
from tornado import gen

future = None

class MainHandler(tornado.web.RequestHandler):
    def get(self):
        global future
        future.set_result(None)  #为Future中result赋值
        self.write("main")


class IndexHandler(tornado.web.RequestHandler):
    @gen.coroutine
    def get(self):
        global future
        future = Future()
        future.add_done_callback(self.done)
        yield future    #由于future中的result中值一直未被赋值,所有客户端一直等待

    def done(self,*args,**kwargs):
        self.write("index")
        self.finish()

st ={
    "template_path": "template",#模板路径配置
    "static_path":\'static\',
}

#路由映射   匹配执行,否则404
application = tornado.web.Application([
    ("/main",MainHandler),
    ("/index",IndexHandler),
],**st)

if __name__=="__main__":
    application.listen(8080)

    #io多路复用
    tornado.ioloop.IOLoop.instance().start()

 五:自定义web框架(同步)

# coding:utf8
# __author:  Administrator
# date:      2018/6/30 0030
# /usr/bin/env python
import socket
from select import select
import re

class HttpResponse(object):
    """
    封装响应信息
    """
    def __init__(self, content=\'\'):
        self.content = content
        \'\'\'

        \'\'\'
        self.status = "HTTP/1.1 200 OK"
        self.headers = {}
        self.cookies = {}

        self.initResponseHeader()

    def changeStatus(self,status_code,status_desc):
        self.status = "HTTP/1.1 %s %s"%(status_code,status_desc)

    def initResponseHeader(self):
        self.headers[\'Content-Type\']=\'text/html; charset=utf-8\'
        self.headers[\'X-Frame-Options\']=\'SAMEORIGIN\'
        self.headers[\'X-UA-Compatible\']=\'IE=10\'
        self.headers[\'Cache-Control\']=\'private, max-age=10\'
        self.headers[\'Vary\']=\'Accept-Encoding\'
        self.headers[\'Connection\']=\'keep-alive\'

    def response(self):
        resp_content = None
        header_list = [self.status,]
        for item in self.headers.items():
            header_list.append("%s: %s"%(item[0],item[1]))

        header_str = "\\r\\n".join(header_list)
        resp_content = "\\r\\n\\r\\n".join([header_str,self.content])
        return bytes(resp_content, encoding=\'utf-8\')

class HttpRequest:
    def __init__(self,content):
        """content:用户传递的请求头信息,字节型"""
        self.content = content
        self.header_bytes = bytes()
        self.body_bytes = bytes()

        self.header_str = ""
        self.body_str = ""

        self.header_dict = {}

        self.method = ""
        self.url = ""
        self.protocol = ""

        self.initialize()
        self.initialize_headers()

    def initialize(self):
        data = self.content.split(b"\\r\\n\\r\\n",1)
        if len(data) == 1:  #全是请求头
            self.header_bytes = self.content
        else:   #含有请求头和请求体
            self.header_bytes,self.body_bytes = data
        self.header_str = str(self.header_bytes,encoding="utf-8")
        self.body_str = str(self.body_bytes,encoding="utf-8")

    def initialize_headers(self):
        headers = self.header_str.split("\\r\\n")
        first_line = headers[0].split(" ")
        if len(first_line) == 3:
            self.method,self.url,self.protocol = first_line
        for line in headers[1:]:
            k_v = line.split(":",1)
            if len(k_v) == 2:
                k,v = k_v
                self.header_dict[k] = v


def main(request):
    return "main"

def index(request):
    return "index"

routers = [
    ("/main/",main),
    (\'/index/\',index),
]

def run():
    sock = socket.socket()
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.bind((\'127.0.0.1\', 8080))
    sock.listen(128)
    sock.setblocking(False)

    inputs = []
    inputs.append(sock)
    while True:
        rlist, wlist, elist = select(inputs, [], [], 0.05)  # http是单向的,我们只获取请求即可
        for r in rlist:
            if r == sock:  # 有新的请求到来
                conn, addr = sock.accept()
                conn.setblocking(False)
                inputs.append(conn)
            else:  # 客户端请求数据
                data = b""
                # 开始获取请求头
                while True:
                    try:
                        chunk = r.recv(1024)
                        data += chunk
                    except BlockingIOError as e:
                        chunk = None
                    if not chunk:
                        break

                # 处理请求头,请求体
                request = HttpRequest(data)
                #1.获取url
                #2.路由匹配
                #3.执行函数,获取返回值
                #4.将返回值发送
                flag = False
                func = None
                for route in routers:
                    if re.match(route[0],request.url):
                        flag = True
                        func = route[1]
                        break
                if flag:
                    result = func(request)
                    response = HttpResponse(result)
                    r.sendall(response.response())
                else:
                    response = HttpResponse("Not Found")
                    response.changeStatus(404,"Not Page")
                    r.sendall(response.response())
                inputs.remove(r)
                r.close()


if __name__ == "__main__":
    run()

 

 未实现异步非阻塞

 六:完善自定义web框架(异步)

import socket
from select import select
import re,time

class HttpResponse(object):
    """
    封装响应信息
    """
    def __init__(self, content=\'\'):
        self.content = content
        \'\'\'

        \'\'\'
        self.status = "HTTP/1.1 200 OK"
        self.headers = {}
        self.cookies = {}

        self.initResponseHeader()

    def changeStatus(self,status_code,status_desc):
        self.status = "HTTP/1.1 %s %s"%(status_code,status_desc)

    def initResponseHeader(self):
        self.headers[\'Content-Type\']=\'text/html; charset=utf-8\'
        self.headers[\'X-Frame-Options\']=\'SAMEORIGIN\'
        self.headers[\'X-UA-Compatible\']=\'IE=10\'
        self.headers[\'Cache-Control\']=\'private, max-age=10\'
        self.headers[\'Vary\']=\'Accept-Encoding\'
        self.headers[\'Connection\']=\'keep-alive\'

    def response(self):
        resp_content = None
        header_list = [self.status,]
        for item in self.headers.items():
            header_list.append("%s: %s"%(item[0],item[1]))

        header_str = "\\r\\n".join(header_list)
        resp_content = "\\r\\n\\r\\n".join([header_str,self.content])
        return bytes(resp_content, encoding=\'utf-8\')

class HttpRequest:
    def __init__(self,content):
        """content:用户传递的请求头信息,字节型"""
        self.content = content
        self.header_bytes = bytes()
        self.body_bytes = bytes()

        self.header_str = ""
        self.body_str = ""

        self.header_dict = {}

        self.method = ""
        self.url = ""
        self.protocol = ""

        self.initialize()
        self.initialize_headers()

    def initialize(self):
        data = self.content.split(b"\\r\\n\\r\\n",1)
        if len(data) == 1:  #全是请求头
            self.header_bytes = self.content
        else:   #含有请求头和请求体
            self.header_bytes,self.body_bytes = data
        self.header_str = str(self.header_bytes,encoding="utf-8")
        self.body_str = str(self.body_bytes,encoding="utf-8")

    def initialize_headers(self):
        headers = self.header_str.split("\\r\\n")
        first_line = headers[0].split(" ")
        if len(first_line) == 3:
            self.method,self.url,self.protocol = first_line
        for line in headers[1:]:
            k_v = line.split(":",1)
            if len(k_v) == 2:
                k,v = k_v
                self.header_dict[k] = v

class Future:
    def __init__(self,timeout):
        self.result = None
        self.timeout = timeout
        self.start = time.time()

    def add_callback_done(self,callback,request):
        self.callback = callback
        self.request = request

    def call(self):
        if self.result == "timeout":  #超时就不要去获取页面数据,直接返回超时
            return "timeout"
        if self.result:  #若是没有超时,去获取回调数据
            return self.callback(self.request)

def callback(request):
    print(request)
    return "async main"

f = None

def main(request):
    global f
    f = Future(10)
    f.add_callback_done(callback,request)  #设置回调
    return f

def index(request):
    return "index"

def stop(request):
    if f:
        f.result = True
    return "stop"

routers = [
    ("/main/",main),
    (\'/index/\',index),
    (\'/stop/\', stop),  #用于向future的result赋值
]

def run():
    sock = socket.socket()
    sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
    sock.bind((\'127.0.0.1\', 8080))
    sock.listen(128)
    sock.setblocking(False)

    inputs = []
    async_request_dict = {}
    inputs.append(sock)
    while True:
        rlist, wlist, elist = select(inputs, [], [], 0.05)  # http是单向的,我们只获取请求即可
        for r in rlist:
            if r == sock:  # 有新的请求到来
                conn, addr = sock.accept()
                conn.setblocking(False)
                inputs.append(conn)
            else:  # 客户端请求数据
                data = b""
                # 开始获取请求头
                while True:
                    try:
                        chunk = r.recv(1024)
                        data += chunk
                    except BlockingIOError as e:
                        chunk = None
                    if not chunk:
                        break

                # 处理请求头,请求体
                request = HttpRequest(data)
                #1.获取url
                #2.路由匹配
                #3.执行函数,获取返回值
                #4.将返回值发送
                flag = False
                func = None
                for route in routers:
                    if re.match(route[0],request.url):
                        flag = True
                        func = route[1]
                        break
                if flag:
                    result = func(request)
                    if isinstance(result,Future):  #对于future对象,我们另外做异步处理,不阻塞当前操作
                        async_request_dict[r] = result
                        continue
                    response = HttpResponse(result)
                    r.sendall(response.response())
                else:
                    response = HttpResponse("Not Found")
                    response.changeStatus(404,"Not Page")
                    r.sendall(response.response())
                inputs.remove(r)
                r.close()

        for conn in list(async_request_dict.keys()):  #另外对future对象处理
            future = async_request_dict[conn]
            start = future.start
            timeout = future.timeout
            if (start+timeout) <= time.time():  #超时检测
                future.result = "timeout"
            if future.result:
                response = HttpResponse(future.call())  #获取回调数据
                conn.sendall(response.response())
                conn.close()
                del async_request_dict[conn]  #删除字典中这个链接,和下面inputs列表中链接
                inputs.remove(conn)

if __name__ == "__main__":
    run()

 

 

 

 

以上是关于python---tornado补充(异步非阻塞)的主要内容,如果未能解决你的问题,请参考以下文章

Python----Tornado web框架

Python Tornado篇

Python Day10(补充)

Python Tornado搭建高并发Restful API接口服务

带你整理面试过程中关于IO 模型的相关知识

Python Tornado Websocket 处理程序在接收数据时阻塞