小白对异步IO的理解
Posted ytbhandsome
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了小白对异步IO的理解相关的知识,希望对你有一定的参考价值。
前言
看到越来越多的大佬都在使用python的异步IO,协程等概念来实现高效的IO处理过程,可是我对这些概念还不太懂,就学习了一下。 因为是初学者,在理解上有很多不到位的地方,如果有错误,还希望能够有人积极帮我指出。
下面就使用一个简单的爬虫的例子,通过一步一步的改进,最后来用异步IO的方式实现。
1. 阻塞的IO
我们要实现一个爬虫,去爬百度首页n次,最简单的想法就是依次下载,从建立socket连接到发送网络请求再到读取响应数据,顺序进行。
代码如下:
1 import time 2 import socket 3 import sys 4 5 def doRequest(): 6 sock = socket.socket() 7 sock.connect((‘www.baidu.com‘,80)) 8 sock.send("GET / HTTP/1.1 Host: www.baidu.com Connection: Close ".encode("utf-8")) 9 response = sock.recv(1024) 10 return response 11 12 def main(): 13 start = time.time() 14 for i in range(int(sys.argv[1])): 15 doRequest() 16 print("spend time : %s" %(time.time()-start)) 17 18 main()
因为socket是阻塞方式调用的,所以cpu执行到sock.connect()
,sock.recv()
,就会一直卡在那里直到socket的状态就绪,所以浪费了很多的CPU时间。
请求10次和20次的时间分别如下所示:
1 ? python3 1.py 10 2 spend time : 0.9282660484313965 3 ? python3 1.py 20 4 spend time : 1.732438087463379
可以看到,速度慢的跟蜗牛一样。
2. 改进1-并发
为了加快请求的速度,很容易想到我们可以使用并发的方式进行,那么最好的方式就是使用多线程了。修改后的代码如下:
1 # 多线程 2 3 import time 4 import socket 5 import sys 6 import threading 7 8 def doRequest(): 9 sock = socket.socket() 10 sock.connect((‘www.baidu.com‘,80)) 11 sock.send("GET / HTTP/1.1 Host: www.baidu.com Connection: Close ".encode("utf-8")) 12 response = sock.recv(1024) 13 return response 14 15 def main(): 16 start = time.time() 17 threads = [] 18 for i in range(int(sys.argv[1])): 19 # doRequest() 20 threads.append(threading.Thread(target=doRequest,args=())) 21 for i in threads: 22 i.start() 23 for i in threads: 24 i.join() 25 print("spend time : %s" %(time.time()-start))
使用线程之后,看一下请求10次和20次的时间:
1 ? python3 2.py 10 2 spend time : 0.1124269962310791 3 ? python3 2.py 20 4 spend time : 0.15438294410705566
速度明显快了很多,几乎是刚才的10倍了。
但是python的线程是有问题的,因为一个python进程中,同一时刻只允许一个线程运行,正在执行的线程会获取到GPL。做阻塞的系统调用时,例如sock.connect()
,sock.recv()
时,当前线程会释放GIL,让别的线程有机会获取GPL,然后执行。但是这种获取GPL的调度策略是抢占式的,以保证同等优先级的线程都有均等的执行机会,那带来的问题是:并不知道下一时刻是哪个线程被运行,也不知道它正要执行的代码是什么。所以就可能存在竞态条件。这种竞争有可能使某些线程处于劣势,导致一直获取不到GPL
比如如下的情况,线程1执行的代码如下:
1 flag = True 2 while flag: 3 ..... # 这里省略一些复杂的操作,会调用多次IO操作 4 time.sleep(1)
可以看到,线程1的任务非常简单,而线程2的任务非常复杂,这就会导致CPU不停地去执行线程1,而真正做实际工作的线程2却很少被调度到,导致了浪费了大量的CPU资源。
3. 改进2-非阻塞方式
在第一个例子中,我们意识到浪费了大量的时间,是因为我们用了阻塞的IO,导致CPU在卡在那里等待IO的就绪,那使用非阻塞的IO,是不是就可以解决这个问题了。
代码如下:
1 import time 2 import socket 3 import sys 4 5 def doRequest(): 6 sock = socket.socket() 7 sock.setblocking(False) 8 try: 9 sock.connect((‘www.baidu.com‘,80)) 10 except BlockingIOError: 11 pass 12 13 # 因为设置为非阻塞模式了,不知道何时socket就绪,需要不停的监控socket的状态 14 while True: 15 try: 16 sock.send("GET / HTTP/1.1 Host: www.baidu.com Connection: Close ".encode("utf-8")) 17 # 直到send 不抛出异常,就发送成功了 18 break 19 except OSError: 20 pass 21 while True: 22 try: 23 response = sock.recv(1024) 24 break 25 except OSError: 26 pass 27 return response 28 def main(): 29 start = time.time() 30 for i in range(int(sys.argv[1])): 31 doRequest() 32 print("spend time : %s" %(time.time()-start)) 33 34 main()
sock.setblocking(False)
把socket设置为非阻塞式的,也就是说执行完sock.connect()
和sock.recv()
之后,CPU不再等待IO了,会继续往下执行,来看一下执行时间:
1 ? python3 3.py 10 2 spend time : 1.0597507953643799 3 ? python3 3.py 20 4 spend time : 2.0327329635620117
感觉被骗了,速度还是跟第一个一样啊,看来非阻塞IO并没有什么速度上的提升啊,问题出在哪里呢?看代码发现多了两个while循环:
1 while True: 2 try: 3 sock.send("GET / HTTP/1.1 Host: www.baidu.com Connection: Close ".encode("utf-8")) 4 # 直到send 不抛出异常,就发送成功了 5 break 6 except OSError: 7 pass 8 while True: 9 try: 10 response = sock.recv(1024) 11 break 12 except OSError: 13 pass
因为把socket设置为非阻塞模式了,所以CPU并不知道IO什么时候就绪,所以必须在这里不停的尝试,直到IO可以使用了为止。
虽然 connect() 和 recv() 不再阻塞主程序,空出来的时间段CPU没有空闲着,但并没有利用好这空闲去做其他有意义的事情,而是在循环尝试读写 socket (不停判断非阻塞调用的状态是否就绪)。
有没有办法让CPU空闲出来的时间,不用来不停的询问IO,而是干别的更有意义的事情呢,等IO就绪后再通知CPU回来处理呢?当然有了,那就是回调。
4. 改进3-回调
操作系统已经把IO状态的改变封装成了事件,如可读事件、可写事件。并且可以为这些事件绑定处理函数。所以我们可以使用这种方式,为socket的IO状态的变化绑定处理函数,交给系统进行调动,这样就是回调方式。python的select模块支持这样的操作。
代码如下:
1 import socket 2 from selectors import DefaultSelector, EVENT_WRITE, EVENT_READ 3 import sys 4 selector = DefaultSelector() 5 stopped = False 6 urls_todo = [] 7 8 class Crawler: 9 def __init__(self, url): 10 self.url = url 11 self.sock = None 12 self.response = b‘‘ 13 14 def fetch(self): 15 self.sock = socket.socket() 16 self.sock.setblocking(False) 17 try: 18 self.sock.connect((‘www.baidu.com‘, 80)) 19 except BlockingIOError: 20 pass 21 selector.register(self.sock.fileno(), EVENT_WRITE, self.connected) 22 23 def connected(self, key, mask): 24 selector.unregister(key.fd) 25 get = ‘GET {0} HTTP/1.0 Host: www.baidu.com ‘.format(self.url) 26 self.sock.send(get.encode(‘ascii‘)) 27 selector.register(key.fd, EVENT_READ, self.read_response) 28 29 def read_response(self, key, mask): 30 global stopped 31 # 如果响应大于4KB,下一次循环会继续读 32 chunk = self.sock.recv(4096) 33 if chunk: 34 self.response += chunk 35 else: 36 selector.unregister(key.fd) 37 urls_todo.remove(self.url) 38 if not urls_todo: 39 stopped = True 40 41 def loop(): 42 while not stopped: 43 # 阻塞, 直到一个事件发生 44 events = selector.select() 45 for event_key, event_mask in events: 46 callback = event_key.data 47 callback(event_key, event_mask) 48 49 if __name__ == ‘__main__‘: 50 import time 51 start = time.time() 52 for i in range(int(sys.argv[1])): 53 urls_todo.append("/"+str(i)) 54 crawler = Crawler("/"+str(i)) 55 crawler.fetch() 56 loop() 57 print("spend time : %s" %(time.time()-start))
监控socket的状态,如果变为可写的,就往里面写数据
selector.register(self.sock.fileno(), EVENT_WRITE, self.connected)
监控socket的状态,如果变为可读的,就外读数据
selector.register(key.fd, EVENT_READ, self.read_response)
测试一下速度:
1 ? python3 4.py 10 2 spend time : 0.03910994529724121 3 ? python3 4.py 20 4 spend time : 0.04195284843444824
我们看到速度已经有个一个质的飞跃了,但是回调用一些严重的问题,会破坏代码的本来的逻辑结构,造成代码可读性很差。
比如我们有函数 funcA,funcB,funcC三个函数,如果funcC处理的结果依赖于funcB的处理结果,funcB的处理结果依赖于funcA的处理结果,而funcA又是回调的方式调用的,所以就不知道funcA什么时候返回,所以只能将后续的处理都作为callback的方式传入funcA中,让funcA执行完了可以执行funcB,funcB执行完了可以执行funcC,看起来像下面这样:
funcA(funcB(funcC()))
这就形成了一个链式的回调,跟最初的代码逻辑完全相反,本来的逻辑应该是这样的。
funcC(funcB(funcA()))
因为这样的链式回调的出现,导致了理解代码逻辑困难,并且错误处理困难。
有没有方法避免这种地狱式的链式回调的呢?
5 .改进4-利用生成器
可以利用python的生成器,把发请求的函数写成一个生成器,然后只监控IO的状态,当IO状态发生改变之后,就给生成器传送值,驱动生成器进行下一步操作,这样就可以避免回调了,具体实现如下:
1 import select 2 import socket 3 import time 4 import sys 5 6 num = int(sys.argv[1]) 7 8 def coroutine(): 9 sock = socket.socket() 10 sock.setblocking(0) 11 address = yield sock 12 try: 13 sock.connect(address) 14 except BlockingIOError: 15 pass 16 data = yield 17 size = yield sock.send(data) 18 yield sock.recv(size) 19 20 def main(): 21 inputs = [] 22 outputs = [] 23 coros = [] 24 coro_dict = dict() 25 for i in range(num): 26 coros.append(coroutine()) 27 sock = coros[-1].send(None) # 发送一个None值来启动生成器 28 outputs.append(sock.fileno()) # 29 # print(outputs) 30 coro_dict[sock.fileno()] = coros[-1] 31 coros[-1].send((‘www.baidu.com‘, 80)) 32 while True: 33 r_list,w_list,e_list = select.select(inputs,outputs, ()) 34 for i in w_list: 35 # print(type(i)) 36 coro_dict[i].send(b‘GET / HTTP/1.1 Host: www.baidu.com Connection: Close ‘) 37 outputs.remove(i) 38 inputs.append(i) 39 for i in r_list: 40 coro_dict[i].send(1024) 41 inputs.remove(i) 42 if len(inputs) == len(outputs) == 0: 43 break 44 # time.sleep(2) 45 # coro.send(b‘GET / HTTP/1.1 Host: www.baidu.com Connection: Close ‘) 46 # select.select(wait_list, (), ()) 47 # print(coro.send(1024)) 48 49 start = time.time() 50 main() 51 print("spend time : %s" %(time.time()-start))
可以看到把发起请求的函数写成了一个生成器:
1 def coroutine(): 2 sock = socket.socket() 3 sock.setblocking(0) 4 address = yield sock 5 try: 6 sock.connect(address) 7 except BlockingIOError: 8 pass 9 data = yield 10 size = yield sock.send(data) 11 yield sock.recv(size)
然后监控IO状态,当IO状态发生改变之后,驱动生成器继续运行。
1 while True: 2 r_list,w_list,e_list = select.select(inputs,outputs, ()) 3 for i in w_list: 4 # print(type(i)) 5 coro_dict[i].send(b‘GET / HTTP/1.1 Host: www.baidu.com Connection: Close ‘) 6 outputs.remove(i) 7 inputs.append(i) 8 for i in r_list: 9 coro_dict[i].send(1024) 10 inputs.remove(i)
看一下程序执行时间:
1 ? python3 5.py 10 2 spend time : 0.058114051818847656 3 ? python3 5.py 20 4 spend time : 0.0949699878692627
效果貌似非常的棒啊,执行的太快了,但是当我执行300次请求的时候,我就发现问题了,返回的非常慢,。估计原因可能是select是顺序遍历每一个IO描述符的去做状态检查,当IO描述符过多的时候,会导致遍历的速度比较慢,所以造成时间花费很大。
以上是关于小白对异步IO的理解的主要内容,如果未能解决你的问题,请参考以下文章