Python-异步编程
Posted JerryZao
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python-异步编程相关的知识,希望对你有一定的参考价值。
1、异步 同步
函数或方法被调用时,调用者是否得到最终的结果
直接得到最终结果的,就是同步调用
不直接得到最终结果的,就是异步调用
2、阻塞 非阻塞
函数或方法调用的时候,是否立刻返回
立即返回就是非阻塞
不立即返回就是阻塞调用
3、区别
同步,异步,与 阻塞,非阻塞 没有关系
同步,异步强调的是,是否得到最终的结果。
阻塞,非阻塞强调的是时间,是否等待
同步与异步区别在于:调用者是否得到了想要的结果
同步就是一直要执行到返回最终的结果
异步就是直接返回额,但是返回的不是最终的结果,调用者不能通过这种嗲用得到结果,需要童工被调用者的其他方式通知调用者,来取回最终结果。
阻塞与非阻塞 的区别在于,调用者是否还能干其他的事
阻塞,调用者就只能干等
非阻塞,调用者可以先去忙别的,不用一直等。
4、联系
同步阻塞:什么都不做,直到拿到最终的结果
同步非阻塞:等最终结果的期间,可以做别的
异步阻塞:给一个信息,让等通知,但是期间什么都不干
异步非阻塞:等通知,但是期间可以做别的
5、同步IO,异步IO,IO 多路复用
5.1、IO 两个阶段:
1、数据准备阶段
2、内核空间复制数据到用户进程缓冲区阶段
5.2、发生IO的时候:
1、内核从输入设备读,写数据
2、进程从内核复制数据
系统调用 ---- read 函数
6、IO 模型
同步IO:
同步IO 模型包括:阻塞IO,非阻塞IO, IO多路复用
***阻塞IO:
进程等待(阻塞),知道读写完成(全程等待)
read/ write 函数
****非阻塞IO
进程调用read操作,如果IO设备没有准备好,立即返回error ,进程不阻塞,用户可以再次发起系统调用,如果内核已经准备好,就阻塞,然后复制数据到用户空间。
第一阶段数据没有准备好,就先忙别的,等会再来看看,检查数据是否准备好了的过程是非阻塞的。
第二阶段是阻塞的,即内核空间和用户空间之间复制数据 是阻塞的。
****IO 多路复用
所谓IO 多路复用,就是同事监控多个IO,有一个准备好了,就不需要等了,开始处理,提高了同时处理IO 的能力
select 几乎所有的操作系统平台都支持,poll是对select的升级
epoll,Linux系统内核2.5+开始支持,对select和poll的增强,在监视的基础上,增加了回调机制,BSD,Mac平台有kqueque,windows有iocp
select 为例,将关注的IO 操作 告诉select函数 并调用,进程阻塞,内核监视select关注的文件描述符fd,被关注的任何一个fd对应的IO准备好了数据,select 返回,在使用read将数据复制到用户进程。
一般情况下,select最多能监听1024个fd(可以修改,不建议修改),但是由于select 采用轮询的方式,当管理的IO 多了,每次都要遍历全部额fd,效率低下(每次某个IO 设备准备好了,都需要遍历一遍select)
基于事件驱动的epoll没有管理的fd的上限, 且是回调机制,不需要遍历,效率很高。
事件驱动IO:
通知机制:
1、水平触发通知,一直通知,直到处理
2、边缘触发, 只通知一次
event 是事件驱动IO
异步IO:
进程发起异步IO 请求,立即返回,内核完成 IO 的两个阶段,内核给进程发一个信号
7、Python中 IO 多路 复用
IO 多路复用:
-
- 大多数操作系统都支持select 和poll
- Linux 2.5+ 支持epoll
- BSD,Mac 支持kqueque
- windoes的iocp
Python的select库
实现了select , poll 系统调用,这个基本上操作系统都支持,部分实现了epoll
底层的IO 多路复用模块。
开发中的选择:
1、完全夸平台,使用select ,poll,但是性能较差
2、针对不同操作系统自行选择支持的技术,
select 维护一个文件描述符数据结构,单个进程使用有上限,通常1024,线性扫描这个数据结构,效率低,
poll和select的区别是内部数据结构使用链表,没有这个最大限制,但是依然是线性遍历才能知道那个设备就绪了
epoll使用事件通知机制,使用回调机制提高效率
select poll 还要从内核空间复制消息到用户空间,而epoll通过内核空间 和用户空间共享的一块内存来减少复制(mmap))
8、selectors 库
3.4版本提 selectors 库, 高级IO 复用库
类层次结构: BaseSelector +-- SelectSelector 实现select +-- PollSelector 实现poll +-- EpollSelector 实现epoll +-- DevpollSelector 实现devpoll +-- KquequeSelector实现kqueque
selector.DefaultSelector 返回当前平台最有效,性能最高的实现、
源代码:
if \'KqueueSelector\' in globals(): DefaultSelector = KqueueSelector elif \'EpollSelector\' in globals(): DefaultSelector = EpollSelector elif \'DevpollSelector\' in globals(): DefaultSelector = DevpollSelector elif \'PollSelector\' in globals(): DefaultSelector = PollSelector else: DefaultSelector = SelectSelector
但是,由于没有实现Windows 下的IOCP ,所以,只能退化为select
abstractmethod register( fileobj, events, data=Nona)
为selector 注册一个文件对象,监视它的IO 事件,返回SelectKey对象
fileobj 被监视文件对象,例如:socket对象
events 事件,该文件对象必须等待的事件
data 可选的,与此文件对象相关联的不透明的数据,例如:关联用来存储每个客户端的会话ID,关联方法,通过这个参数在关注的时间产生后让二selector干什么事
selector.SelectorKey 有4 个属性
fileobj 注册的文件对象
fd 文件描述符
events 等待上面的文件描述符的文件对象的事件
data 注册时关联的数据
举例: 完成一个TCP Server, 能够接受客户端 请求并回应 客户端消息
1 import selectors 2 import threading 3 import socket 4 import logging 5 6 FORMAT = \'%(asctime)s %(threadName)s %(thread)s %(message)s\' 7 logging.basicConfig(level=logging.INFO, format=FORMAT) 8 9 # 构建缺省性能最优的selector 10 selector = selectors.DefaultSelector() 11 12 # 创建 tcp server 13 sock = socket.socket() 14 laddr = \'127.0.0.1\', 9999 15 sock.bind(laddr) 16 sock.listen() 17 logging.info(sock) 18 19 sock.setblocking(False) # 非阻塞 20 21 # 回调函数,自定义形参 22 def accept(sock:socket.socket, mask): 23 """ mask :事件 掩码的或值 """ 24 conn, raddr = sock.accept() # 这里虽然可以阻塞,但是事实上,阻塞是在 events = selector.select() 就处理了 25 conn.setblocking(False) # 不阻塞 26 logging.info(\'new socket {} in accept\'.format(conn)) 27 28 # 注册文件对象sock 关注读事件,返回SelectorKey 29 # 将sock,关注事件, data都绑定到key 实例属性上 30 key = selector.register(sock, selectors.EVENT_READ, accept) 31 logging.info(key) 32 # 2018-11-06 14:03:15,407 MainThread 2964 SelectorKey(fileobj=<socket.socket fd=272, 33 # family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, 34 # laddr=(\'127.0.0.1\', 9999)>, fd=272, events=1, data=<function accept at 0x0000000002930268>) 35 while True: 36 # 开始监听,等到有文件对象监控事件产生,返回(key, mask) 元组 37 events = selector.select() # 没有 准备好的io,会阻塞在这里。 38 logging.info(events)# 是一个二元组 组成的列表 39 \'\'\' 40 2018-11-06 14:06:20,215 MainThread 5236 [(SelectorKey(fileobj=<socket.socket fd=272, 41 family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, 42 laddr=(\'127.0.0.1\', 9999)>, fd=272, events=1, data=<function accept at 0x0000000002271268>), 1)] 43 \'\'\' 44 for key, mask in events: 45 logging.info(key) 46 logging.info(mask) 47 callback = key.data #accept 48 callback(key.fileobj, mask) # accept(key.fileobj, mask)
结果:
1 D:\\python3.7\\python.exe E:/code_pycharm/tt7.py 2 2018-11-06 14:11:01,816 MainThread 8520 <socket.socket fd=272, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=(\'127.0.0.1\', 9999)> 3 2018-11-06 14:11:01,817 MainThread 8520 SelectorKey(fileobj=<socket.socket fd=272, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=(\'127.0.0.1\', 9999)>, fd=272, events=1, data=<function accept at 0x0000000002950268>) 4 2018-11-06 14:11:03,851 MainThread 8520 [(SelectorKey(fileobj=<socket.socket fd=272, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=(\'127.0.0.1\', 9999)>, fd=272, events=1, data=<function accept at 0x0000000002950268>), 1)] 5 2018-11-06 14:11:03,851 MainThread 8520 SelectorKey(fileobj=<socket.socket fd=272, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=(\'127.0.0.1\', 9999)>, fd=272, events=1, data=<function accept at 0x0000000002950268>) 6 2018-11-06 14:11:03,851 MainThread 8520 1 7 2018-11-06 14:11:03,851 MainThread 8520 new socket <socket.socket fd=268, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=(\'127.0.0.1\', 9999), raddr=(\'127.0.0.1\', 58313)> in accept
测试:处于一直监听状态
1 import selectors 2 import threading 3 import socket 4 import logging 5 6 FORMAT = \'%(asctime)s %(threadName)s %(thread)s %(message)s\' 7 logging.basicConfig(level=logging.INFO, format=FORMAT) 8 9 # 构造缺省性能最优elector 10 selector = selectors.DefaultSelector() 11 12 # 创建TCP server 13 sock = socket.socket() 14 laddr = \'127.0.0.1\', 9999 15 sock.bind(laddr) 16 sock.listen() 17 logging.info(sock) 18 19 sock.setblocking(False) # 非阻塞 20 21 # 回调函数,自己定义参数 22 def accept(sock:socket.socket, mask): 23 conn, raddr = sock.accept() 24 conn.setblocking(False) 25 26 logging.info(\'new socket {} in accept\'.format(conn)) 27 key = selector.register(conn, selectors.EVENT_READ, read) 28 logging.info(\'--------------------\') 29 logging.info(key) 30 31 # 回调函数 32 def read(conn:socket.socket, mask): 33 logging.info(\'===================\') 34 data = conn.recv(1024) 35 msg = \'your msg = {} ===\'.format(data.decode()) 36 logging.info(msg) 37 conn.send(msg.encode()) 38 39 # 注册 文件对象sock 关注读事件, 返回SelectorKey 40 # 将sock, 关注事件,data都 绑定到key实例属性上 41 key = selector.register(sock, selectors.EVENT_READ, accept) 42 logging.info(key) 43 44 event = threading.Event() 45 46 def select(): 47 while not event.is_set():# 一直处于监听状态 48 # 开始监听 ,等到有文件对象监控事件产生, 返回(key, mask) 元组 49 events = selector.select() 50 for key, mask in events: 51 callback = key.data 52 logging.info(\'********************\') 53 callback(key.fileobj, mask) 54 55 threading.Thread(target=select, name=\'select\').start() 56 57 def main(): 58 while not event.is_set(): 59 cmd = input(\'>>>\') 60 if cmd.strip() ==\'quit\': 61 event.set() 62 fobjs = [] 63 logging.info(\'{}\'.format(list(selector.get_map().items()))) 64 65 # 字典遍历,不能直接操作数据 66 for fd, key in selector.get_map().items(): # 返回注册的项 67 print(fd) # 从打印可看出,开始用于accept的sock 也被记录当中 68 print(key.fileobj) 69 fobjs.append(key.fileobj) 70 71 for fobj in fobjs: 72 selector.unregister(fobj) 73 fobj.close() 74 selector.close() 75 76 if __name__ == \'__main__\': 77 main()
结果:
1 D:\\python3.7\\python.exe E:/code_pycharm/tt8.py 2 2018-11-06 14:53:39,911 MainThread 9256 <socket.socket fd=272, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=(\'127.0.0.1\', 9999)> 3 2018-11-06 14:53:39,911 MainThread 9256 SelectorKey(fileobj=<socket.socket fd=272, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=(\'127.0.0.1\', 9999)>, fd=272, events=1, data=<function accept at 0x00000000004CC1E0>) 4 >>>2018-11-06 14:53:55,211 select 8388 ******************** 5 2018-11-06 14:53:55,221 select 8388 new socket <socket.socket fd=280, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=(\'127.0.0.1\', 9999), raddr=(\'127.0.0.1\', 59300)> in accept 6 2018-11-06 14:53:55,221 select 8388 -------------------- 7 2018-11-06 14:53:55,221 select 8388 SelectorKey(fileobj=<socket.socket fd=280, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=(\'127.0.0.1\', 9999), raddr=(\'127.0.0.1\', 59300)>, fd=280, events=1, data=<function read at 0x0000000002A29510>) 8 2018-11-06 14:53:56,851 select 8388 ******************** 9 2018-11-06 14:53:56,851 select 8388 new socket <socket.socket fd=296, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=(\'127.0.0.1\', 9999), raddr=(\'127.0.0.1\', 59301)> in accept 10 2018-11-06 14:53:56,851 select 8388 -------------------- 11 2018-11-06 14:53:56,851 select 8388 SelectorKey(fileobj=<socket.socket fd=296, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=(\'127.0.0.1\', 9999), raddr=(\'127.0.0.1\', 59301)>, fd=296, events=1, data=<function read at 0x0000000002A29510>) 12 13 14 >>>>>>quit 15 2018-11-06 14:54:03,426 MainThread 9256 [(272, SelectorKey(fileobj=<socket.socket fd=272, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=(\'127.0.0.1\', 9999)>, fd=272, events=1, data=<function accept at 0x00000000004CC1E0>)), (280, SelectorKey(fileobj=<socket.socket fd=280, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=(\'127.0.0.1\', 9999), raddr=(\'127.0.0.1\', 59300)>, fd=280, events=1, data=<function read at 0x0000000002A29510>)), (296, SelectorKey(fileobj=<socket.socket fd=296, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=(\'127.0.0.1\', 9999), raddr=(\'127.0.0.1\', 59301)>, fd=296, events=1, data=<function read at 0x0000000002A29510>))] 16 272 17 <socket.socket fd=272, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=(\'127.0.0.1\', 9999)> 18 280 19 <socket.socket fd=280, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=(\'127.0.0.1\', 9999), raddr=(\'127.0.0.1\', 59300)> 20 296 21 <socket.socket fd=296, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=(\'127.0.0.1\', 9999), raddr=(\'127.0.0.1\', 59301)> 22 23 Process finished with exit code 0
实现ChatServer 群聊:
测试1:只监听了读,收到客户端信息,直接发送信息,给客户端。并没有监听客户端。
1 import threading 2 import selectors 3 import logging 4 import socket 5 6 FORMAT = \'%(asctime)s %(threadName)s %(thread)s %(message)s\' 7 logging.basicConfig(level=logging.INFO, format=FORMAT) 8 9 10 class ChatServer: 11 def __init__(self, ip=\'127.0.0.1\', port=9999): 12 self.laddr = ip, port 13 self.sock = socket.socket() 14 self.event = threading.Event() 15 self.selector = selectors.DefaultSelector() 16 17 def start(self): 18 self.sock.bind(self.laddr) 19 self.sock.listen() 20 21 self.selector.register(self.sock, selectors.EVENT_READ, self.accept) 22 threading.Thread(target=self.select, name=\'select\', daemon=True).start() 23 24 def select(self): 25 while not self.event.is_set(): 26 events = self.selector.select() 27 for key, mask in events: 28 callbakck = key.data 29 callbakck(key.fileobj, mask) 30 31 def accept(self,sock, mask): 32 conn, raddr = sock.accept() 33 key = self.selector.register(conn, selectors.EVENT_READ, self.send) 34 35 def send(self,conn, mask): 36 data = conn.recv(1024) 37 38 if data.strip() == b\'quit\' or data == b\'\': 39 self.selector.unregister(conn) 40 conn.close() 41 return 42 43 for key in self.selector.get_map().values(): 44 if key.data !=self.accept: 45 msg = \'your msg is {}\'.format(data.decode()).encode() 46 key.fileobj.send(msg) 47 48 def stop(self): 49 self.event.set() 50 fobjs = [] 51 for fd, key in self.selector.get_map().items(): 52 fobjs.append(key.fileobj) 53 for fobj in fobjs: 54 self.selector.unregister(fobj) 55 fobj.close() 56 self.selector.close() 57 58 def main(): 59 cs = ChatServer() 60 cs.start() 61 62 while True: 63 print(\'=======\') 64 cmd = input(">>>>") 65 if cmd == \'quit\': 66 cs.stop() 67 break 68 print(threading.enumerate()) 69 70 if __name__ == \'__main__\': 71 main()
将服务端封装为了一个类,实例化 一个对象,进行操作
使用了IO 复用 的模型,不需要创建容器去收纳客户端,selects,提供了一个字典,收纳 (fd, selectkey)
不需要多线程,只需要将阻塞主线程的监听放在一个新线程中,不会阻塞主线程,就行。
只需要循环监听函数就行,不需要 循环 send 和 recv
测试2:实现读写分离,利用中间容器,存放读到的东西,写的时候,从里边拿。这里使用了Queue.
self.selector.register(conn, selectors.EVENT_READ | selectors.EVENT_WRITE, self.recv)
注册语句,要监听 selectors.EVENT_READ | selectors.EVENT_WRITE 读 与 写 事件
回调的时候,需要mask 来判断究竟是读触发了,还是写触发了,所以,需要修改方法声明,增加mask
注意:读 和 写 是分离的,那么,handle 函数,应该这样写:
1 def handle(self, sock, mask): 2 if mask & selectors.EVENT_READ: 读是1,写是2(十进制),二进制01,10,所以位与 3 pass 4 5 # 注意: 这里是某一个socket的写操作 6 if mask & selectors.EVENT_WRITE:# 写缓冲区准备好了,可以写数据了 7 pass
handle方法 里面处理读, 写,mask有可能是0b01 0b10, 0b11
问题是,假设读取到了客户端发来的数据后,如何写进去》?
为每一个与客户端连接的socket对象增加对应的队列
与每一个客户端连接的socket对象,自己维护一个队列,某一个客户端收到消息后,会遍历发给所有客户端的队列,这里完成一对多,即一份数据放到了所有的队列中。
与每一个客户端练级的socket对象,发现自己对联有数据,就发送给客户端。
1 import threading 2 import selectors 3 import logging 4 import socket 5 from queue import Queue 6 7 FORMAT = \'%(asctime)s %(threadName)s %(thread)s %(message)s\' 8 logging.basicConfig(level=logging.INFO, format=FORMAT) 9 10 11 class ChatServer: 12 def __init__(self, ip=\'127.0.0.1\', port=9999): 13 self.laddr = ip, port 14 self.sock = socket.socket() 15 self.event = threading.Event() 16 self.selector = selectors.DefaultSelector() 17 18 def start(self): 19 self.sock.bind(self.laddr) 20 self.sock.listen() 21 22 self.selector.register(self.sock, selectors.EVENT_READ, self.accept) 23 threading.Thread(target=self.select, name=\'select\', daemon=True).start() 24 25 def select(self): 26 while not self.event.is_set(): 27 events = self.selector.select() 28 for key, mask in events: 29 if key.data == self.accept: 30 callbakck = key.data 31 callbakck(key.fileobj) 32 else: 33 logging.info(\'{}\'.format(threading.enumerate())) 34以上是关于Python-异步编程的主要内容,如果未能解决你的问题,请参考以下文章