python---tornado补充(异步非阻塞)
Posted 山上有风景
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python---tornado补充(异步非阻塞)相关的知识,希望对你有一定的参考价值。
一:正常访问(同一线程中多个请求是同步阻塞状态)
import tornado.ioloop import tornado.web import tornado.websocket import datetime,time class MainHandler(tornado.web.RequestHandler): def get(self): self.write("main")class IndexHandler(tornado.web.RequestHandler): def get(self): time.sleep(10) self.write("index") st ={ "template_path": "template",#模板路径配置 "static_path":\'static\', } #路由映射 匹配执行,否则404 application = tornado.web.Application([ ("/main",MainHandler), ("/index",IndexHandler), ],**st) if __name__=="__main__": application.listen(8080) #io多路复用 tornado.ioloop.IOLoop.instance().start()
我们先访问index,再去访问main,查看情况
二:使用future模块,实现异步非阻塞
import tornado.ioloop import tornado.web import tornado.websocket import time from tornado.concurrent import Future class MainHandler(tornado.web.RequestHandler): def get(self): self.write("main")class IndexHandler(tornado.web.RequestHandler): def get(self): future = Future() tornado.ioloop.IOLoop.current().add_timeout(time.time()+5,self.done) #会在结束后为future中result赋值 yield future def done(self,*args,**kwargs): self.write("index") self.finish() #关闭请求连接,必须在回调中完成 st ={ "template_path": "template",#模板路径配置 "static_path":\'static\', } #路由映射 匹配执行,否则404 application = tornado.web.Application([ ("/main",MainHandler), ("/index",IndexHandler), ],**st) if __name__=="__main__": application.listen(8080) #io多路复用 tornado.ioloop.IOLoop.instance().start()
三:在tornado中使用异步IO请求模块
import tornado.ioloop import tornado.web import tornado.websocket import time from tornado.concurrent import Future from tornado import httpclient from tornado import gen class MainHandler(tornado.web.RequestHandler): def get(self): self.write("main") def post(self, *args, **kwargs): pass class IndexHandler(tornado.web.RequestHandler): @gen.coroutine def get(self): http = httpclient.AsyncHTTPClient() yield http.fetch("http://www.google.com",self.done) def done(self): self.write("index") self.finish() def post(self, *args, **kwargs): pass st ={ "template_path": "template",#模板路径配置 "static_path":\'static\', } #路由映射 匹配执行,否则404 application = tornado.web.Application([ ("/main",MainHandler), ("/index",IndexHandler), ],**st) if __name__=="__main__": application.listen(8080) #io多路复用 tornado.ioloop.IOLoop.instance().start()
四:请求间交互,使用future
import tornado.ioloop import tornado.web import tornado.websocket from tornado.concurrent import Future from tornado import gen class MainHandler(tornado.web.RequestHandler): def get(self): self.write("main") class IndexHandler(tornado.web.RequestHandler): @gen.coroutine def get(self): future = Future() future.add_done_callback(self.done) yield future #由于future中的result中值一直未被赋值,所有客户端一直等待 def done(self,*args,**kwargs): self.write("index") self.finish() st ={ "template_path": "template",#模板路径配置 "static_path":\'static\', } #路由映射 匹配执行,否则404 application = tornado.web.Application([ ("/main",MainHandler), ("/index",IndexHandler), ],**st) if __name__=="__main__": application.listen(8080) #io多路复用 tornado.ioloop.IOLoop.instance().start()
我们可以在另一个请求中去为这个future中result赋值,使当前请求返回
import tornado.ioloop import tornado.web import tornado.websocket from tornado.concurrent import Future from tornado import gen future = None class MainHandler(tornado.web.RequestHandler): def get(self): global future future.set_result(None) #为Future中result赋值 self.write("main") class IndexHandler(tornado.web.RequestHandler): @gen.coroutine def get(self): global future future = Future() future.add_done_callback(self.done) yield future #由于future中的result中值一直未被赋值,所有客户端一直等待 def done(self,*args,**kwargs): self.write("index") self.finish() st ={ "template_path": "template",#模板路径配置 "static_path":\'static\', } #路由映射 匹配执行,否则404 application = tornado.web.Application([ ("/main",MainHandler), ("/index",IndexHandler), ],**st) if __name__=="__main__": application.listen(8080) #io多路复用 tornado.ioloop.IOLoop.instance().start()
五:自定义web框架(同步)
# coding:utf8 # __author: Administrator # date: 2018/6/30 0030 # /usr/bin/env python import socket from select import select import re class HttpResponse(object): """ 封装响应信息 """ def __init__(self, content=\'\'): self.content = content \'\'\' \'\'\' self.status = "HTTP/1.1 200 OK" self.headers = {} self.cookies = {} self.initResponseHeader() def changeStatus(self,status_code,status_desc): self.status = "HTTP/1.1 %s %s"%(status_code,status_desc) def initResponseHeader(self): self.headers[\'Content-Type\']=\'text/html; charset=utf-8\' self.headers[\'X-Frame-Options\']=\'SAMEORIGIN\' self.headers[\'X-UA-Compatible\']=\'IE=10\' self.headers[\'Cache-Control\']=\'private, max-age=10\' self.headers[\'Vary\']=\'Accept-Encoding\' self.headers[\'Connection\']=\'keep-alive\' def response(self): resp_content = None header_list = [self.status,] for item in self.headers.items(): header_list.append("%s: %s"%(item[0],item[1])) header_str = "\\r\\n".join(header_list) resp_content = "\\r\\n\\r\\n".join([header_str,self.content]) return bytes(resp_content, encoding=\'utf-8\') class HttpRequest: def __init__(self,content): """content:用户传递的请求头信息,字节型""" self.content = content self.header_bytes = bytes() self.body_bytes = bytes() self.header_str = "" self.body_str = "" self.header_dict = {} self.method = "" self.url = "" self.protocol = "" self.initialize() self.initialize_headers() def initialize(self): data = self.content.split(b"\\r\\n\\r\\n",1) if len(data) == 1: #全是请求头 self.header_bytes = self.content else: #含有请求头和请求体 self.header_bytes,self.body_bytes = data self.header_str = str(self.header_bytes,encoding="utf-8") self.body_str = str(self.body_bytes,encoding="utf-8") def initialize_headers(self): headers = self.header_str.split("\\r\\n") first_line = headers[0].split(" ") if len(first_line) == 3: self.method,self.url,self.protocol = first_line for line in headers[1:]: k_v = line.split(":",1) if len(k_v) == 2: k,v = k_v self.header_dict[k] = v def main(request): return "main" def index(request): return "index" routers = [ ("/main/",main), (\'/index/\',index), ] def run(): sock = socket.socket() sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.bind((\'127.0.0.1\', 8080)) sock.listen(128) sock.setblocking(False) inputs = [] inputs.append(sock) while True: rlist, wlist, elist = select(inputs, [], [], 0.05) # http是单向的,我们只获取请求即可 for r in rlist: if r == sock: # 有新的请求到来 conn, addr = sock.accept() conn.setblocking(False) inputs.append(conn) else: # 客户端请求数据 data = b"" # 开始获取请求头 while True: try: chunk = r.recv(1024) data += chunk except BlockingIOError as e: chunk = None if not chunk: break # 处理请求头,请求体 request = HttpRequest(data) #1.获取url #2.路由匹配 #3.执行函数,获取返回值 #4.将返回值发送 flag = False func = None for route in routers: if re.match(route[0],request.url): flag = True func = route[1] break if flag: result = func(request) response = HttpResponse(result) r.sendall(response.response()) else: response = HttpResponse("Not Found") response.changeStatus(404,"Not Page") r.sendall(response.response()) inputs.remove(r) r.close() if __name__ == "__main__": run()
未实现异步非阻塞
六:完善自定义web框架(异步)
import socket from select import select import re,time class HttpResponse(object): """ 封装响应信息 """ def __init__(self, content=\'\'): self.content = content \'\'\' \'\'\' self.status = "HTTP/1.1 200 OK" self.headers = {} self.cookies = {} self.initResponseHeader() def changeStatus(self,status_code,status_desc): self.status = "HTTP/1.1 %s %s"%(status_code,status_desc) def initResponseHeader(self): self.headers[\'Content-Type\']=\'text/html; charset=utf-8\' self.headers[\'X-Frame-Options\']=\'SAMEORIGIN\' self.headers[\'X-UA-Compatible\']=\'IE=10\' self.headers[\'Cache-Control\']=\'private, max-age=10\' self.headers[\'Vary\']=\'Accept-Encoding\' self.headers[\'Connection\']=\'keep-alive\' def response(self): resp_content = None header_list = [self.status,] for item in self.headers.items(): header_list.append("%s: %s"%(item[0],item[1])) header_str = "\\r\\n".join(header_list) resp_content = "\\r\\n\\r\\n".join([header_str,self.content]) return bytes(resp_content, encoding=\'utf-8\') class HttpRequest: def __init__(self,content): """content:用户传递的请求头信息,字节型""" self.content = content self.header_bytes = bytes() self.body_bytes = bytes() self.header_str = "" self.body_str = "" self.header_dict = {} self.method = "" self.url = "" self.protocol = "" self.initialize() self.initialize_headers() def initialize(self): data = self.content.split(b"\\r\\n\\r\\n",1) if len(data) == 1: #全是请求头 self.header_bytes = self.content else: #含有请求头和请求体 self.header_bytes,self.body_bytes = data self.header_str = str(self.header_bytes,encoding="utf-8") self.body_str = str(self.body_bytes,encoding="utf-8") def initialize_headers(self): headers = self.header_str.split("\\r\\n") first_line = headers[0].split(" ") if len(first_line) == 3: self.method,self.url,self.protocol = first_line for line in headers[1:]: k_v = line.split(":",1) if len(k_v) == 2: k,v = k_v self.header_dict[k] = v class Future: def __init__(self,timeout): self.result = None self.timeout = timeout self.start = time.time() def add_callback_done(self,callback,request): self.callback = callback self.request = request def call(self): if self.result == "timeout": #超时就不要去获取页面数据,直接返回超时 return "timeout" if self.result: #若是没有超时,去获取回调数据 return self.callback(self.request) def callback(request): print(request) return "async main" f = None def main(request): global f f = Future(10) f.add_callback_done(callback,request) #设置回调 return f def index(request): return "index" def stop(request): if f: f.result = True return "stop" routers = [ ("/main/",main), (\'/index/\',index), (\'/stop/\', stop), #用于向future的result赋值 ] def run(): sock = socket.socket() sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sock.bind((\'127.0.0.1\', 8080)) sock.listen(128) sock.setblocking(False) inputs = [] async_request_dict = {} inputs.append(sock) while True: rlist, wlist, elist = select(inputs, [], [], 0.05) # http是单向的,我们只获取请求即可 for r in rlist: if r == sock: # 有新的请求到来 conn, addr = sock.accept() conn.setblocking(False) inputs.append(conn) else: # 客户端请求数据 data = b"" # 开始获取请求头 while True: try: chunk = r.recv(1024) data += chunk except BlockingIOError as e: chunk = None if not chunk: break # 处理请求头,请求体 request = HttpRequest(data) #1.获取url #2.路由匹配 #3.执行函数,获取返回值 #4.将返回值发送 flag = False func = None for route in routers: if re.match(route[0],request.url): flag = True func = route[1] break if flag: result = func(request) if isinstance(result,Future): #对于future对象,我们另外做异步处理,不阻塞当前操作 async_request_dict[r] = result continue response = HttpResponse(result) r.sendall(response.response()) else: response = HttpResponse("Not Found") response.changeStatus(404,"Not Page") r.sendall(response.response()) inputs.remove(r) r.close() for conn in list(async_request_dict.keys()): #另外对future对象处理 future = async_request_dict[conn] start = future.start timeout = future.timeout if (start+timeout) <= time.time(): #超时检测 future.result = "timeout" if future.result: response = HttpResponse(future.call()) #获取回调数据 conn.sendall(response.response()) conn.close() del async_request_dict[conn] #删除字典中这个链接,和下面inputs列表中链接 inputs.remove(conn) if __name__ == "__main__": run()
以上是关于python---tornado补充(异步非阻塞)的主要内容,如果未能解决你的问题,请参考以下文章