异步编程学习
Posted czaorz
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了异步编程学习相关的知识,希望对你有一定的参考价值。
原文章来源于:
https://mp.weixin.qq.com/s?__biz=MzIxMjY5NTE0MA==&mid=2247483720&idx=1&sn=f016c06ddd17765fd50b705fed64429c
原项目GitHub:
https://github.com/denglj/aiotutorial
原文章写得很精彩,但有些代码还是可以优化下的。而且这文章一直只有上篇,可惜了。
接下来按个人见解,从代码角度解析下这篇文章:
前提知识讲解:
1、计算机资源:常分为CPU资源、内存资源、硬盘资源和网络资源
2、进程阻塞:正在运行的程序,由于自身某个模块需要使用硬盘或网络I/O资源等,而系统又未及时响应,导致进程处于待机状态,直至等待事件作出回应后才会被唤醒。
3、进程非阻塞:同理,在获取某些资源时,不会等待结果响应,而是继续处理其他模块。
我们以socket为例,如下可获取阻塞与非阻塞两种编程
import socket sock = socket.socket()
socket.setblocking(True) # 默认就是阻塞。即套接字 建立连接/发送请求/接受请求 的时候,是阻塞的。
socket.setblocking(False) # 设置为非阻塞,即上述请求过程不会阻塞,而是继续处理其他模块。
使用原生asyncio编写异步程序:
在此代码中,我们需要注意几个关键点
1、loop = asyncio.get_event_loop() # 开启事件循环,异步"任务"将在此循环执行
2、asyncio.create_task() # 将一个协程包装成一个"任务"排入日程准备执行
3、asyncio.gather() # 同步执行"任务"
import asyncio import aiohttp import time loop = asyncio.get_event_loop() async def fetch(): async with aiohttp.ClientSession(loop=loop) as session: async with session.get(‘http://www.baidu.com‘) as response: print(await response.read()) async def multi_fetch(): await asyncio.gather(*[asyncio.create_task(fetch()) for _ in range(10)]) if __name__ == ‘__main__‘: start = time.time() loop.run_until_complete(fetch()) # 执行一次 # loop.run_until_complete(multi_fetch()) # 执行十次 print(time.time() - start)
接下来我们就来一步一步的实现上述几个关键点,实现手写自己的异步程序
首先得实现一个阻塞程序,以socket为例。此例子比较简单,大致看一下即可。
用时的话,我们可以明显看出blocking_socket() 用时大致0.7s
而multi_blocking_socket() 用时大致0.7s,刚好是10倍左右。
import socket import time def blocking_socket(response=b‘‘): sock = socket.socket() # 默认为阻塞连接 sock.connect((‘www.baidu.com‘, 80)) # 建立百度TCP连接 sock.send(b‘GET / HTTP/1.0 ‘) # 发送HTTP协议 chunk = sock.recv(1024) # 接收数据 while chunk: response += chunk chunk = sock.recv(1024) return response def mutil_blocking_socket(): return [blocking_socket() for _ in range(10)].__len__() if __name__ == ‘__main__‘: start = time.time() blocking_socket() # 执行一次 # mutil_blocking_socket() # 执行十次 print(time.time() - start)
接着我们来实现非阻塞程序,仍然使用socket来实现
来比较时间,我们会惊讶的发现,no_blocking_socket() 执行一次大致0.07s
而multi_no_blocking_socket() 执行十次,大致是0.7s....
没错,花费时间和阻塞编程是一样的。我们的非阻塞编程并没有达到实际的效果。
import socket import time def no_blocking_socket(response=b‘‘): sock = socket.socket() sock.setblocking(False) # 设置非阻塞连接 try: sock.connect((‘www.baidu.com‘, 80)) except BlockingIOError: # 非阻塞式建立连接在此处会报错,捕获忽略即可 pass while True: try: sock.send(b‘GET / HTTP/1.0 ‘) # 发送HTTP协议 break except OSError: # 此处会报错。因为套接字与百度服务器的TCP连接还没有建立,
# 但是套接字却是非阻塞的,故在未建立连接的情况下发送协议会报错,此处捕获此异常 continue while True: try: chunk = sock.recv(1024) while chunk: response += chunk chunk = sock.recv(1024) return response except OSError: # 同理,对方服务器未接收HTTP协议,不会返回数据,即使返回,也有时延。故此时接收数据会报异常,此处捕获此异常 pass def multi_no_blocking_socket(): return [no_blocking_socket() for _ in range(10)].__len__() if __name__ == ‘__main__‘: start = time.time() no_blocking_socket() # 执行一次 # multi_no_blocking_socket() # 执行十次 print(time.time() - start)
我们来分析一下,为什么会造成这种现象。我们发现这套非阻塞代码与之前的阻塞代码相比,
多了三处 try: ... except: ...
多了两处 while True: ...
代码确实是非阻塞编程,也就是程序不会在网络IO模块处阻塞,但是程序也没有把空闲下来的时间花在"正确"的地方
在非阻塞的情况下,CUP把空闲下来的时间不停的去"试错",也就是程序会不停的尝试发送协议,直到发送成功。不停的尝试接收数据,直到接收成功。
这样就导致我们的非阻塞编程,实际上和阻塞编程是一样的,唯一要说不同。就是非阻塞程序中的CPU可能会 "忙一点"。
那么我们可以注意到了问题的关键,就是我们不知道什么时候程序是 "准备就绪了",也就是什么时候可以发送协议,什么时候可以接收数据。
其实操作系统已经帮我们实现了,它将事件的 I/O 都封装成了事件,如可读事件,可写事件。那么我们就可以立即想到,当我们的套接字状态变为我们所需要的时候,
就立即执行接下来的步骤。所以此处,我们就需要 "回调"!
这里我们说明下python中selectors模块,用于注册事件的回调
from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE sock = socket.socket() # 获取套接字
selector = DefaultSelector() # 获取selector对象 selector.register(sock.fileno(), EVENT_WRITE, on_send) # fileno()获取当前socket套接字的文件描述符,并绑定事件EVENT_WRITE,回调函数为on_send selector.unregister(sock.fileno()) # 注销事件绑定 selector.register(sock.fileno(), EVENT_READ, on_recv) # 同理,绑定事件EVENT_READ,回调函数为on_recv
while True:
events = selector.select() # 此处的events是操作系统返回的事件,也就是我们绑定的事件被触发了,
# 此处是阻塞获取的。也就是用一个事件循环的阻塞,来代替我们的while True: ...
for sock, mask in events:
sock.data() #sock.data为绑定的回调函数,也就是上面的on_send和on_recv
如此我们就不需要自己手动while True来监控事件状态的改变,将这件工作交给事件循环。此处我们将其命名为loop。
接下来就是回调编程了
import socket import time from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE selector = DefaultSelector() stop_loop = 10 class Crawler: def __init__(self, flag=10): self.flag = flag self.sock = None self.response = b‘‘ def fetch(self): self.sock = socket.socket() self.sock.setblocking(False) try: self.sock.connect((‘www.baidu.com‘, 80)) except BlockingIOError: pass
# fileno()获取当前socket套接字的文件描述符,并绑定事件回调
selector.register(self.sock.fileno(), EVENT_WRITE, self.on_send)
def on_send(self): selector.unregister(self.sock.fileno()) self.sock.send(b‘GET / HTTP/1.0 ‘) selector.register(self.sock.fileno(), EVENT_READ, self.on_recv) def on_recv(self): chunk = self.sock.recv(1024) if chunk: self.response += chunk else: global stop_loop stop_loop -= self.flag selector.unregister(self.sock.fileno()) def loop(): # 事件循环,由操作系统返回那个事件发生了,对应执行那些事件的回调。 while stop_loop: events = selector.select() for sock, mask in events: sock.data() if __name__ == ‘__main__‘: start = time.time() Crawler(10).fetch() # 执行一次 # [Crawler(1).fetch() for _ in range(10)] # 执行十次 loop() print(time.time() - start)
我们来捋一下代码执行的流程:
1、首先实例化一个Crawler对象,然后执行此实例的fetch方法
2、fetch方法发起了与百度服务器的连接,然后注册了回调函数。
3、此时会走到loop()函数来,执行事件循环。直到我们与对方服务器的连接建立成功,则此时OS会返回事件,我们则执行对应的回调事件 sock.data() <=> self.on_send()
4、执行on_recv,首先注销上一个事件,然后发送协议,再接着注册可读事件。继续进入等待
5、此时继续进入loop事件循环,直到触发注册事件,执行回调函数on_recv,由于一次只接收1024,故可能会接收多次,也就是会触发多次on_recv事件的回调,直接接收完成。
6、接收完成,我们令全局变量stop_loop - flag,来停止loop事件循环。程序结束。
然后我们来看下时间,我们会发现执行一次的时间和执行十次的时间基本是差不多的。说明我们编写的程序是没有问题的。
回调式异步编程,成功!
至此,我们已经实现了回调式异步编程,但是我们思考下第一个例子,是基于协程的异步编程,故我们现在来调整代码,编写协程。
import socket import time from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE selector = DefaultSelector() stop_loop = 10 class Future: # 用于存放未来可能出现的数据,当出现时执行一次回调函数 # 此中的result仅作为一个中转,实际还是通过回调返回给协程 def __init__(self): self.result = None self.callback = None def set_callback(self, func): self.callback = func def set_result(self, result): self.result = result self.callback(self) if self.callback else None class Task: # 用于启动协程,该类实例初始化时传入为协程对象,执行self.process方法 # 调用协程的send方法,启动协程,并最后绑定回调函数 def __init__(self, co_routine): self.co_routine = co_routine future = Future() self.process(future) def process(self, future): try: next_future = self.co_routine.send(future.result) except StopIteration: return next_future.set_callback(self.process) class Crawler: def __init__(self, flag=10): self.flag = flag self.sock = None self.response = b‘‘ def fetch(self): self.sock = socket.socket() self.sock.setblocking(False) try: self.sock.connect((‘www.baidu.com‘, 80)) except BlockingIOError: pass future = Future() def _on_send(): future.set_result(None) def _on_recv(): future.set_result(self.sock.recv(1024)) selector.register(self.sock.fileno(), EVENT_WRITE, _on_send) yield future selector.unregister(self.sock.fileno()) self.sock.send(b‘GET / HTTP/1.0 ‘) selector.register(self.sock.fileno(), EVENT_READ, _on_recv) while True: chunk = yield future # 在此处轮询EVENT_READ事件,直至所有数据加载完毕 if chunk: self.response += chunk else: global stop_loop stop_loop -= self.flag return self.response def loop(): while stop_loop: events = selector.select() for sock, mask in events: sock.data() if __name__ == ‘__main__‘: start = time.time() Task(Crawler(10).fetch()) # 传入协程fetch,使用Task实例化调用协程的send方法来启动协程 # [Task(Crawler(1).fetch()) for _ in range(10)] # 同理,启动十个协程任务 loop() print(time.time() - start)
我们继续来整理下代码执行的流程:
1、首先实例化一些Crawler对象,调用调用此实例的fetch函数得到一个协程。此时协程是没有执行的。(协程需要send等触发才会执行)
2、将此协程装于Task用来创建任务实例,在任务中会主动触发协程的send函数来启动协程
3、此时协程已触发,注册事件 selector.register(self.sock.fileno(), EVENT_WRITE, _on_send),然后返回future,此时协程到此暂停
4、返回的future会添加任务的回调函数,也就是self.precess()。而loop也开始了事件轮询,当套接字的文件描述符状态变为可写状态时,触发回调方法_on_send
5、_on_send方法执行Future中的set_result方法,此时在此方法中会调用一次future注册的回调函数,继续触发任务Task中协程的send方法,回到协程上次暂停的状态
6、回来后,首先注销事件EVENT_WRITE。发送HTTP协议请求。再注册事件 selector.register(self.sock.fileno(), EVENT_READ, _on_recv)
7、loop事件轮询,当套接字的文件描述符变为可读状态时,触发回调方法_on_recv
8、_on_recv方法执行Future中的set_result方法,此时在方法中会初始化result为sock.recv(1024)的值,并执行注册的回调函数,将此结果继续传递至协程上回暂停的地方
9、由于一直没有注销事件EVENT_READ,故会一直驱动事件轮询直至结束
10、Task、Future、Crawler、loop这四个就这么神奇的串联在一起了,不可思议的说。
不过这样写貌似不台好看,虽然感觉也可以,但是很多模块其实都是可以拆离开的
下面就是一个拆分版本,就不细细的分析流程啦
import socket import time from selectors import DefaultSelector, EVENT_READ, EVENT_WRITE selector = DefaultSelector() stop_loop = 10 def fetch(sock): sock.setblocking(False) try: sock.connect((‘www.baidu.com‘, 80)) except BlockingIOError: pass future = Future() def _on_send(): future.set_result(None) selector.register(sock.fileno(), EVENT_WRITE, _on_send) yield from future selector.unregister(sock.fileno()) return future def read(sock, future, flag, response=b‘‘): def _on_recv(): future.set_result(sock.recv(1024)) selector.register(sock.fileno(), EVENT_READ, _on_recv) chunk = yield from future while chunk: response += chunk chunk = yield from future selector.unregister(sock.fileno()) global stop_loop stop_loop -= flag return response def loop(): while stop_loop: events = selector.select() for sock, mask in events: sock.data() class Future: def __init__(self): self.result = None self.callback = None def set_callback(self, func): self.callback = func def set_result(self, result): self.result = result self.callback(self) if self.callback else None def __iter__(self): yield self return self.result class Task: def __init__(self, co_routine): self.co_routine = co_routine future = Future() self.process(future) def process(self, future): try: next_future = self.co_routine.send(future.result) except StopIteration: return next_future.set_callback(self.process) class Crawler: def __init__(self, flag): self.flag = flag def fetch(self): sock = socket.socket() future = yield from fetch(sock) sock.send(b‘GET / HTTP/1.0 ‘) response = yield from read(sock, future, self.flag) print(response) if __name__ == ‘__main__‘: start = time.time() Task(Crawler(10).fetch()) # [Task(Crawler(1).fetch()) for _ in range(10)] loop() print(time.time() - start)
代码放到github上了
https://github.com/CzaOrz/ioco/tree/master/open_source_project/%E5%BC%82%E6%AD%A5%E6%95%99%E7%A8%8B%E5%AD%A6%E4%B9%A0/%E5%BC%82%E6%AD%A5%E5%8D%8F%E7%A8%8B
以上是关于异步编程学习的主要内容,如果未能解决你的问题,请参考以下文章