自定义异步IO框架

Posted linyuhong

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了自定义异步IO框架相关的知识,希望对你有一定的参考价值。

异步就是回调

异步 = 非阻塞+循环

select只能完成IO多路复用,不能完成异步

IO多路复用--->监听多个socket对象,这个过程是同步的

利用其特性可以开发异步模块

异步IO:非阻塞的socket + IO多路复用

自定义异步框架

import socket
import select


class HttpRequest(object):
    def __init__(self, sk, host, callback):
        self.socket = sk
        self.host = host
        self.callback = callback
    def fileno(self): # select监听的对象,只要内部有fileno()方法,并且返回fileno
        return self.socket.fileno()

class HttpResponse(object):
    def __init__(self, recv_data):
        self.recv_data = recv_data
        self.header_dict = {}
        self.body = None
        self.initialize()

    def initialize(self):
        headers, body = self.recv_data.split(b

, 1)
        self.body = body
        header_list = headers.split(b
)
        for h in header_list:
            h_str = str(h, encoding=utf-8)
            v = h_str.split(:, 1)
            if len(v) == 2:
                self.header_dict[v[0]] = v[1]


class AsyncRequest(object):
    def __init__(self):
        self.conn = []
        self.connection = []
    def add_request(self, host, callback):
        try:
            sk = socket.socket()
            sk.setblocking(0)
            sk.connect((host, 80),)
        except BlockingIOError as e:
            pass
        # 把sk、host和callback封装起来,返回fd给select
        request = HttpRequest(sk, host, callback)
        self.conn.append(request)
        self.connection.append(request)

    def run(self):
        while True:
            rlist, wlist, elist = select.select(self.conn, self.connection, self.conn, 0.05)
            for w in wlist:
                # 只要能循环到,表示socket和服务端已经连接成功
                print(w.host, 连接成功...)
                tpl = "GET / HTTP/1.0
Host:%s

" % (w.host,)
                w.socket.send(bytes(tpl, encoding=utf-8))
                self.connection.remove(w)
            for r in rlist:
                recv_data = bytes()
                while True:
                    try:
                        chunck = r.socket.recv(8096)
                        recv_data += chunck
                    except Exception as e:
                        break
                # 把返回的数据进行处理,然后交给回调函数
                response = HttpResponse(recv_data)
                r.callback(response)
                r.socket.close()
                self.conn.remove(r)
            if len(self.conn) == 0:
                break


def f1(response):
    print(保存到文件,response.header_dict)

def f2(response):
    print(保存到数据库, response.header_dict)

url_list = [
    {host:www.baidu.com,callback: f1},
    {host:cn.bing.com,callback: f2},
    {host:www.cnblogs.com,callback: f2},
]

req = AsyncRequest()
for item in url_list:
    req.add_request(item[host],item[callback])

req.run()

 

以上是关于自定义异步IO框架的主要内容,如果未能解决你的问题,请参考以下文章

200行自定义异步非阻塞Web框架

200行自定义异步非阻塞Web框架

200行自定义异步非阻塞Web框架

200行自定义异步非阻塞Web框架

200行自定义异步非阻塞Web框架

200行自定义异步非阻塞Web框架