事件驱动与异步IO使用
Posted ︻◣_蝸犇り~
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了事件驱动与异步IO使用相关的知识,希望对你有一定的参考价值。
事件驱动模型:
每收到一个请求,放入一个事件列表,让主进程通过非阻塞I/O方式来处理请求,网络服务器采用此方式。
目前大部分的UI编程都是事件驱动模型,如很多UI平台都会提供onClick()事件,这个事件就代表鼠标按下事件。事件驱动模型大体思路如下:
1. 有一个事件(消息)队列;
2. 鼠标按下时,往这个队列中增加一个点击事件(消息);
3. 有个循环,不断从队列取出事件,根据不同的事件,调用不同的函数,如onClick()、onKeyDown()等;
4. 事件(消息)一般都各自保存各自的处理函数指针,这样,每个消息都有独立的处理函数;
事件驱动编程是一种编程范式,这里程序的执行流由外部事件来决定。它的特点是包含一个事件循环,当外部事件发生时使用回调机制来触发相应的处理。另外两种常见的编程范式是(单线程)同步以及多线程编程。
让我们用例子来比较和对比一下单线程、多线程以及事件驱动编程模型。下图展示了随着时间的推移,这三种模式下程序所做的工作。这个程序有3个任务需要完成,每个任务都在等待I/O操作时阻塞自身。阻塞在I/O操作上所花费的时间已经用灰色框标示出来了。
在单线程同步模型中,任务按照顺序执行。如果某个任务因为I/O而阻塞,其他所有的任务都必须等待,直到它完成之后它们才能依次执行。这种明确的执行顺序和串行化处理的行为是很容易推断得出的。如果任务之间并没有互相依赖的关系,但仍然需要互相等待的话这就使得程序不必要的降低了运行速度。
在多线程版本中,这3个任务分别在独立的线程中执行。这些线程由操作系统来管理,在多处理器系统上可以并行处理,或者在单处理器系统上交错执行。这使得当某个线程阻塞在某个资源的同时其他线程得以继续执行。与完成类似功能的同步程序相比,这种方式更有效率,但程序员必须写代码来保护共享资源,防止其被多个线程同时访问。多线程程序更加难以推断,因为这类程序不得不通过线程同步机制如锁、可重入函数、线程局部存储或者其他机制来处理线程安全问题,如果实现不当就会导致出现微妙且令人痛不欲生的bug。
在事件驱动版本的程序中,3个任务交错执行,但仍然在一个单独的线程控制中。当处理I/O或者其他昂贵的操作时,注册一个回调到事件循环中,然后当I/O操作完成时继续执行。回调描述了该如何处理某个事件。事件循环轮询所有的事件,当事件到来时将它们分配给等待处理事件的回调函数。这种方式让程序尽可能的得以执行而不需要用到额外的线程。事件驱动型程序比多线程程序更容易推断出行为,因为程序员不需要关心线程安全问题。
当我们面对如下的环境时,事件驱动模型通常是一个好的选择:
- 程序中有许多任务,而且…
- 任务之间高度独立(因此它们不需要互相通信,或者等待彼此)而且…
- 在等待事件到来时,某些任务会阻塞。
当应用程序需要在任务间共享可变的数据时,这也是一个不错的选择,因为这里不需要采用同步处理。
网络应用程序通常都有上述这些特点,这使得它们能够很好的契合事件驱动编程模型。
用户空间与内核空间
Linux简化了分段机制,使得虚拟地址与线性地址总是一致,因此,Linux的虚拟地址空间也为0~4G。
Linux内核将这4G字节的空间分为两部分(虚拟空间):
内核空间:将最高的1G字节(从虚拟地址0xC0000000到0xFFFFFFFF),供内核使用。(存放内存代码和数据)
用户空间:将较低的3G字节(从虚拟地址 0x00000000到0xBFFFFFFF),供各个进程使用。(用户程序的代码和数据)
每个进程可以通过系统调用进入内核,Linux内核由系统内的所有进程共享。从具体进程的角度来看,每个进程可以拥有4G字节的虚拟空间。
内核空间和用户空间一般通过系统调用进行通信
Linux使用两级保护机制:0级供内核使用,3级供用户程序使用。每个进程有各自的私有用户空间(0~3G),对系统中的其他进程是不可见的。最高的1GB字节虚拟内核空间则为所有进程以及内核所共享。
进程切换
进程切换:为了控制进程的执行,内核必须有能力挂起正在CPU上运行的进程,并恢复以前挂起的某个进程的执行。这种行为被称为进程切换。
因此可以说,任何进程都是在操作系统内核的支持下运行的,是与内核紧密相关的。 从一个进程的运行转到另一个进程上运行,这个过程中经过下面这些变化(很耗资源): 1. 保存处理机上下文,包括程序计数器和其他寄存器。 2. 更新PCB信息。 3. 把进程的PCB移入相应的队列,如就绪、在某事件阻塞等队列。 4. 选择另一个进程执行,并更新其PCB。 5. 更新内存管理的数据结构。 6. 恢复处理机上下文。 注:进程控制块(Processing Control Block),是操作系统核心中一种数据结构,主要表示进程状态。
其作用是使一个在多道程序环境下不能独立运行的程序(含数据),成为一个能独立运行的基本单位或与其它进程并发执行的进程。
或者说,OS是根据PCB来对并发执行的进程进行控制和管理的。PCB通常是系统内存占用区中的一个连续存区,它存放着操作系统用于描述进程情况及控制进程运行所需的全部信息
进程的阻塞
正在执行的进程,由于期待的某些事件未发生,如请求系统资源失败、等待某种操作的完成、新数据尚未到达或无新工作做等,则由系统自动执行阻塞原语(Block),使自己由运行状态变为阻塞状态。可见,进程的阻塞是进程自身的一种主动行为,也因此只有处于运行态的进程(获得CPU),才可能将其转为阻塞状态。当进程进入阻塞状态,是不占用CPU资源的
。
文件描述符fd
文件描述符(File descriptor)是计算机科学中的一个术语,是一个用于表述指向文件的引用的抽象化概念。
文件描述符在形式上是一个非负整数。实际上,它是一个索引值,指向内核为每一个进程所维护的该进程打开文件的记录表。当程序打开一个现有文件或者创建一个新文件时,内核向进程返回一个文件描述符。在程序设计中,一些涉及底层的程序编写往往会围绕着文件描述符展开。但是文件描述符这一概念往往只适用于UNIX、Linux这样的操作系统。
缓存 I/O
缓存 I/O 又被称作标准 I/O,大多数文件系统的默认 I/O 操作都是缓存 I/O。在 Linux 的缓存 I/O 机制中,操作系统会将 I/O 的数据缓存在文件系统的页缓存( page cache )中,也就是说,数据会先被拷贝到操作系统内核的缓冲区中,然后才会从操作系统内核的缓冲区拷贝到应用程序的地址空间。(数据 =====> 操作系统内核缓冲区 ====> 应用程序地址空间)
缓存 I/O 的缺点:
数据在传输过程中需要在应用程序地址空间和内核进行多次数据拷贝操作,这些数据拷贝操作所带来的 CPU 以及内存开销是非常大的。
IO多路复用之select、selectors详解
select(rlist, wlist, xlist, timeout=None) 参数:
1.rlist: 等待直到读准备好;
2.wlist: 等待直到写操作准备好;
3.xlist: 等待一个"exceptional condition" ;
允许空序列, 但是如果3个参数都为空的列表的话, 在Unix上可以, 但在Windows上不行, 与平台相关 .
当timeout参数被设定之后, 函数将blocks 知道至少一个文件描述符 is ready, 值为0 表示 a poll and never block.
select 函数监视的文件描述符分3类,分别是writefds、readfds、和exceptfds。
调用后select函数会阻塞,直到有描述副就绪(有数据 可读、可写、或者有except),或者超时(timeout指定等待时间,如果立即返回设为null即可),函数返回。
当select函数返回后,可以 通过遍历fdset,来找到就绪的描述符。
select优点: 目前几乎在所有的平台上支持,其良好跨平台支持。
select缺点: 在于单个进程能够监视的文件描述符的数量存在最大限制,ulimit -n设置,可以通过修改宏定义甚至重新编译内核的方式提升这一限制,但是这样也会造成效率的降低。
1 import select 2 import socket 3 import queue 4 5 server = socket.socket() 6 7 server.bind(("localhost",9000)) 8 9 server.listen(100) 10 11 server.setblocking(False) 12 13 14 inputs = [server] 15 16 outputs = [] 17 18 msg_dic = {} 19 20 while True: 21 readable,writeable,exceptional = select.select(inputs,outputs,inputs) 22 print("readable:",readable, # readable是活跃的那个连接句柄 23 "writeable:",writeable, # writeable下一个循环要发送数据的连接句柄 24 "exceptional:",exceptional) # exceptional连接出错了的连接句柄 25 26 print("inputs:",inputs, # inputs里面是所有监听的连接句柄 27 "outputs:",outputs) 28 29 for r in readable: 30 if r is server: # server活跃证明是有新连接进来了 31 conn,addr = server.accept() 32 print("来了个新连接:",addr) 33 # 实现这个客户端发数据时server端能知道就需要让select加入监测这个conn 34 inputs.append(conn) # 新连接还没发送数据,现在接收数据程序会报错 35 36 msg_dic[conn] = queue.Queue() # 每个新连接都初始化一个队列 37 else: 38 data = r.recv(1024).decode() 39 print("收到数据:",data) 40 msg_dic[r].put(data.upper()) # 把要发送的数据写入到队列中 41 outputs.append(r) # r加入下次循环要发送消息的列表中 42 43 for w in writeable: # 客户端的连接列表 44 data_to_client = msg_dic[w].get() # 获取w在队列中的数据 45 w.sendall(data_to_client.encode()) # 发送给客户端队列中的数据 46 outputs.remove(w) # 确保下次循环的时候writeable不再返回这个已经处理完的连接 47 48 for e in exceptional: # 某个连接出错 49 if e in outputs: # 判断是否在outputs中 50 outputs.remove(e) 51 52 inputs.remove(e) # 每进来一个新连接都会加入到inputs中 53 54 del msg_dic[e] # 删除字典中的连接句柄
1 import select 2 import socket 3 import queue 4 5 6 server = socket.socket() 7 server.setblocking(0) 8 9 server_addr = (\'localhost\',10000) 10 11 print(\'starting up on %s port %s\' % server_addr) 12 server.bind(server_addr) 13 14 server.listen(5) 15 16 17 inputs = [server, ] #自己也要监测呀,因为server本身也是个fd 18 outputs = [] 19 20 message_queues = {} 21 22 while True: 23 print("waiting for next event...") 24 25 readable, writeable, exeptional = select.select(inputs,outputs,inputs) #如果没有任何fd就绪,那程序就会一直阻塞在这里 26 27 for s in readable: #每个s就是一个socket 28 29 if s is server: #别忘记,上面我们server自己也当做一个fd放在了inputs列表里,传给了select,如果这个s是server,代表server这个fd就绪了, 30 #就是有活动了, 什么情况下它才有活动? 当然 是有新连接进来的时候 呀 31 #新连接进来了,接受这个连接 32 conn, client_addr = s.accept() 33 print("new connection from",client_addr) 34 conn.setblocking(0) 35 inputs.append(conn) #为了不阻塞整个程序,我们不会立刻在这里开始接收客户端发来的数据, 把它放到inputs里, 下一次loop时,这个新连接 36 #就会被交给select去监听,如果这个连接的客户端发来了数据 ,那这个连接的fd在server端就会变成就续的,select就会把这个连接返回,返回到 37 #readable 列表里,然后你就可以loop readable列表,取出这个连接,开始接收数据了, 下面就是这么干 的 38 39 message_queues[conn] = queue.Queue() #接收到客户端的数据后,不立刻返回 ,暂存在队列里,以后发送 40 41 else: #s不是server的话,那就只能是一个 与客户端建立的连接的fd了 42 #客户端的数据过来了,在这接收 43 data = s.recv(1024) 44 if data: 45 print("收到来自[%s]的数据:" % s.getpeername()[0], data) 46 message_queues[s].put(data) #收到的数据先放到queue里,一会返回给客户端 47 if s not in outputs: 48 outputs.append(s) #为了不影响处理与其它客户端的连接 , 这里不立刻返回数据给客户端 49 50 else:#如果收不到data代表什么呢? 代表客户端断开了呀 51 print("客户端断开了",s) 52 53 if s in outputs: 54 outputs.remove(s) #清理已断开的连接 55 56 inputs.remove(s) #清理已断开的连接 57 58 del message_queues[s] ##清理已断开的连接 59 60 for s in writeable: 61 try : 62 next_msg = message_queues[s].get_nowait() 63 64 except queue.Empty: 65 print("client [%s]" %s.getpeername()[0], "queue is empty..") 66 outputs.remove(s) 67 68 else: 69 print("sending msg to [%s]"%s.getpeername()[0], next_msg) 70 s.send(next_msg.upper()) 71 72 for s in exeptional: 73 print("handling exception for ",s.getpeername()) 74 inputs.remove(s) 75 if s in outputs: 76 outputs.remove(s) 77 s.close() 78 79 del message_queues[s]
1 import socket, logging 2 import select, errno 3 4 logger = logging.getLogger("network-server") 5 6 def InitLog(): 7 logger.setLevel(logging.DEBUG) 8 9 fh = logging.FileHandler("network-server.log") 10 fh.setLevel(logging.DEBUG) 11 ch = logging.StreamHandler() 12 ch.setLevel(logging.ERROR) 13 14 formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s") 15 ch.setFormatter(formatter) 16 fh.setFormatter(formatter) 17 18 logger.addHandler(fh) 19 logger.addHandler(ch) 20 21 22 if __name__ == "__main__": 23 InitLog() 24 25 try: 26 # 创建 TCP socket 作为监听 socket 27 listen_fd = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0) 28 except socket.error as msg: 29 logger.error("create socket failed") 30 31 try: 32 # 设置 SO_REUSEADDR 选项 33 listen_fd.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 34 except socket.error as msg: 35 logger.error("setsocketopt SO_REUSEADDR failed") 36 37 try: 38 # 进行 bind -- 此处未指定 ip 地址,即 bind 了全部网卡 ip 上 39 listen_fd.bind((\'\', 2003)) 40 except socket.error as msg: 41 logger.error("bind failed") 42 43 try: 44 # 设置 listen 的 backlog 数 45 listen_fd.listen(10) 46 except socket.error as msg: 47 logger.error(msg) 48 49 try: 50 # 创建 epoll 句柄 51 epoll_fd = select.epoll() 52 # 向 epoll 句柄中注册 监听 socket 的 可读 事件 53 epoll_fd.register(listen_fd.fileno(), select.EPOLLIN) 54 except select.error as msg: 55 logger.error(msg) 56 57 connections = {} 58 addresses = {} 59 datalist = {} 60 while True: 61 # epoll 进行 fd 扫描的地方 -- 未指定超时时间则为阻塞等待 62 epoll_list = epoll_fd.poll() 63 64 for fd, events in epoll_list: 65 # 若为监听 fd 被激活 66 if fd == listen_fd.fileno(): 67 # 进行 accept -- 获得连接上来 client 的 ip 和 port,以及 socket 句柄 68 conn, addr = listen_fd.accept() 69 logger.debug("accept connection from %s, %d, fd = %d" % (addr[0], addr[1], conn.fileno())) 70 # 将连接 socket 设置为 非阻塞 71 conn.setblocking(0) 72 # 向 epoll 句柄中注册 连接 socket 的 可读 事件 73 epoll_fd.register(conn.fileno(), select.EPOLLIN | select.EPOLLET) 74 # 将 conn 和 addr 信息分别保存起来 75 connections[conn.fileno()] = conn 76 addresses[conn.fileno()] = addr 77 elif select.EPOLLIN & events: 78 # 有 可读 事件激活 79 datas = \'\' 80 while True: 81 try: 82 # 从激活 fd 上 recv 10 字节数据 83 data = connections[fd].recv(10) 84 # 若当前没有接收到数据,并且之前的累计数据也没有 85 if not data and not datas: 86 # 从 epoll 句柄中移除该 连接 fd 87 epoll_fd.unregister(fd) 88 # server 侧主动关闭该 连接 fd 89 connections[fd].close() 90 logger.debug("%s, %d closed" % (addresses[fd][0], addresses[fd][1])) 91 break 92 else: 93 # 将接收到的数据拼接保存在 datas 中 94 datas += data 95 except socket.error as msg: 96 # 在 非阻塞 socket 上进行 recv 需要处理 读穿 的情况 97 # 这里实际上是利用 读穿 出 异常 的方式跳到这里进行后续处理 98 if msg.errno == errno.EAGAIN: 99 logger.debug("%s receive %s" % (fd, datas)) 100 # 将已接收数据保存起来 101 datalist[fd] = datas 102 # 更新 epoll 句柄中连接d 注册事件为 可写 103 epoll_fd.modify(fd, select.EPOLLET | select.EPOLLOUT) 104 break 105 else: 106 # 出错处理 107 epoll_fd.unregister(fd) 108 connections[fd].close() 109 logger.error(msg) 110 break 111 elif select.EPOLLHUP & events: 112 # 有 HUP 事件激活 113 epoll_fd.unregister(fd) 114 connections[fd].close() 115 logger.debug("%s, %d closed" % (addresses[fd][0], addresses[fd][1])) 116 elif select.EPOLLOUT & events: 117 # 有 可写 事件激活 118 sendLen = 0 119 # 通过 while 循环确保将 buf 中的数据全部发送出去 120 while True: 121 # 将之前收到的数据发回 client -- 通过 sendLen 来控制发送位置 122 sendLen += connections[fd].send(datalist[fd][sendLen:]) 123 # 在全部发送完毕后退出 while 循环 124 if sendLen == len(datalist[fd]): 125 break 126 # 更新 epoll 句柄中连接 fd 注册事件为 可读 127 epoll_fd.modify(fd, select.EPOLLIN | select.EPOLLET) 128 else: 129 # 其他 epoll 事件不进行处理 130 continue
selectors模块
selectors模块是select多路复用封装的加强版,在windows中会选择select,在linux中会选择epoll
-
win: select
-
linux : select poll epoll
通常是用户空间创建fd,然后copy到内核空间,如果是开fd的数量多,select的的效率低
1 import selectors 2 import socket 3 4 sock = socket.socket() 5 sock.bind(("0.0.0.0",9999)) 6 7 sock.listen(1000) # 一定要加listen,不然无法连接端口 8 9 sock.setblocking(False) # 非阻塞模式 10 11 sel = selectors.DefaultSelector() # 实例化一个对象,会根据不同平台自动设置优先级 12 # epoll|kqueue|devpoll > poll > select. 所以Linux系统会自动设置成epoll win 自动设置成select 13 14 15 # 第一步 16 def accept(sock): # mask 是没有用的 17 conn,addr = sock.accept() 18 print("accepted:",conn,"from:",addr) 19 """ 20 conn: 21 <socket.socket fd=540, 22 family=AddressFamily.AF_INET, 23 type=SocketKind.SOCK_STREAM, 24 proto=0, laddr=(\'127.0.0.1\', 9999), 25 raddr=(\'127.0.0.1\', 58702)> 26 27 addr: (\'127.0.0.1\', 58702) 28 """ 29 conn.setblocking(False) 30 31 sel.register(conn,selectors.EVENT_READ,read) # 注册了,把conn与read函数绑定 32 33 34 # 第三步 35 def read(conn): 36 data = conn.recv(1024) 37 if data: 38 """ 39 >>> s = "hello, world." 40 >>> str(s) 41 \'hello, world.\' 42 >>> repr(s) 43 "\'hello, world.\'" 44 45 """ 46 print("echoing",repr(data),"to",conn) # repr()与str()都是转为字符,repr()输出对python比较有好,str()输出对用户比较友好 47 """ 48 data: b\'lls\' 49 50 conn: 51 <socket.socket fd=540, 52 family=Ad以上是关于事件驱动与异步IO使用的主要内容,如果未能解决你的问题,请参考以下文章
python下:事件驱动与 阻塞IO非阻塞IOIO多路复用异步IO