Python并发编程-事件驱动模型
Posted 小L
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python并发编程-事件驱动模型相关的知识,希望对你有一定的参考价值。
一、事件驱动模型介绍
1、传统的编程模式
例如:线性模式大致流程
开始--->代码块A--->代码块B--->代码块C--->代码块D--->......--->结束
每一个代码块里是完成各种各样事情的代码,但编程者知道代码块A,B,C,D...的执行顺序,唯一能够改变这个流程的是数据。输入不同的数据,根据条件语句判断,流程或许就改为A--->C--->E...--->结束。每一次程序运行顺序或许都不同,但它的控制流程是由输入数据和你编写的程序决定的。如果你知道这个程序当前的运行状态(包括输入数据和程序本身),那你就知道接下来甚至一直到结束它的运行流程。
例如:事件驱动型程序模型大致流程
开始--->初始化--->等待
与上面传统编程模式不同,事件驱动程序在启动之后,就在那等待,等待什么呢?等待被事件触发。传统编程下也有“等待”的时候,比如在代码块D中,你定义了一个input(),需要用户输入数据。但这与下面的等待不同,传统编程的“等待”,比如input(),你作为程序编写者是知道或者强制用户输入某个东西的,或许是数字,或许是文件名称,如果用户输入错误,你还需要提醒他,并请他重新输入。事件驱动程序的等待则是完全不知道,也不强制用户输入或者干什么。只要某一事件发生,那程序就会做出相应的“反应”。这些事件包括:输入信息、鼠标、敲击键盘上某个键还有系统内部定时器触发。
2、事件驱动模型
通常,我们写服务器处理模型的程序时,有以下几种模型:
(1)每收到一个请求,创建一个新的进程,来处理该请求; (2)每收到一个请求,创建一个新的线程,来处理该请求; (3)每收到一个请求,放入一个事件列表,让主进程通过非阻塞I/O方式来处理请求
什么是事件驱动模型 ?
目前大部分的UI编程都是事件驱动模型,如很多UI平台都会提供onClick()事件,这个事件就代表鼠标按下事件。事件驱动模型大体思路如下:
- 有一个事件(消息)队列;
- 鼠标按下时,往这个队列中增加一个点击事件(消息);
- 有个循环,不断从队列取出事件,根据不同的事件,调用不同的函数,如onClick()、onKeyDown()等;
- 事件(消息)一般都各自保存各自的处理函数指针,这样,每个消息都有独立的处理函数;
事件驱动编程是一种编程范式,这里程序的执行流由外部事件来决定。它的特点是包含一个事件循环,当外部事件发生时使用回调机制来触发相应的处理。另外两种常见的编程范式是(单线程)同步以及多线程编程。
二、IO模型准备
在进行解释之前,首先要说明几个概念:
- 用户空间和内核空间
- 进程切换
- 进程的阻塞
- 文件描述符
- 缓存 I/O
1、用户空间和内核空间
例如:采用虚拟存储器,对于32bit操作系统,它的寻址空间(虚拟存储空间为4G,即2的32次方)。
操作系统的核心是内核,独立于普通的应用程序,可以访问受保护的内存空间,也可以访问底层硬件的所有权限。
为了保证用户进程不能直接操作内核(kernel),保证内核的安全,操作系统将虚拟空间划分为两部分:一部分为内核空间,另一部分为用户空间。
那么操作系统是如何分配空间的?这里就会涉及到内核态和用户态的两种工作状态。
1G: 0 --->内核态
3G: 1 --->用户态
CPU的指令集,是通过0和1 决定你是用户态,还是内核态
计算机的两种工作状态: 内核态和用户态
cpu的两种工作状态:
现在的操作系统都是分时操作系统,分时的根源,来自于硬件层面操作系统内核占用的内存与应用程序占用的内存彼此之间隔离。cpu通过psw(程序状态寄存器)中的一个2进制位来控制cpu本身的工作状态,即内核态与用户态。
内核态:操作系统内核只能运作于cpu的内核态,这种状态意味着可以执行cpu所有的指令,可以执行cpu所有的指令,这也意味着对计算机硬件资源有着完全的控制权限,并且可以控制cpu工作状态由内核态转成用户态。
用户态:应用程序只能运作于cpu的用户态,这种状态意味着只能执行cpu所有的指令的一小部分(或者称为所有指令的一个子集),这一小部分指令对计算机的硬件资源没有访问权限(比如I/O),并且不能控制由用户态转成内核态。
2、进程切换
为了控制进程的执行,内核必须有能力挂起正在CPU上执行的进程,并恢复以前挂起的某个进程的执行,这种行为就被称为进程切换。
总结:进程切换是很消耗资源的。
3、进程的阻塞
正在执行的进程,由于期待的某些事件未发生,如请求系统资源失败、等待某种操作的完成、新数据尚未到达或无新工作做等,则由系统自动执行阻塞原语(Block),使自己由运行状态变为阻塞状态。可见,进程的阻塞是进程自身的一种主动行为,也因此只有处于运行态的进程(获得CPU),才可能将其转为阻塞状态。当进程进入阻塞状态,是不占用CPU资源的。
4、文件描述符fd
文件描述符(File descriptor)是计算机科学中的一个术语,是一个用于表述指向文件的引用的抽象化概念。
文件描述符在形式上是一个非负整数。实际上,它是一个索引值,指向内核为每一个进程所维护的该进程打开文件的记录表。当程序打开一个现有文件或者创建一个新文件时,内核向进程返回一个文件描述符。在程序设计中,一些涉及底层的程序编写往往会围绕着文件描述符展开。但是文件描述符这一概念往往只适用于UNIX、Linux这样的操作系统。
5、缓存 I/O
缓存 I/O 又被称作标准 I/O,大多数文件系统的默认 I/O 操作都是缓存 I/O。在 Linux 的缓存 I/O 机制中,操作系统会将 I/O 的数据缓存在文件系统的页缓存( page cache )中,也就是说,数据会先被拷贝到操作系统内核的缓冲区中,然后才会从操作系统内核的缓冲区拷贝到应用程序的地址空间。用户空间没法直接访问内核空间的,内核态到用户态的数据拷贝。
缓存 I/O 的缺点:
数据在传输过程中需要在应用程序地址空间和内核进行多次数据拷贝操作,这些数据拷贝操作所带来的 CPU 以及内存开销是非常大的。
本文讨论的背景是Linux环境下的network IO。
IO发生时涉及的对象和步骤:
对于一个network IO (这里我们以read举例),它会涉及到两个系统对象,
1、一个是调用这个IO的process (or thread),
2、另一个就是系统内核(kernel)。
当一个read操作发生时,它会经历两个阶段:
1、等待数据准备 (Waiting for the data to be ready)
2、将数据从内核拷贝到进程中 (Copying the data from the kernel to the process)
记住这两点很重要,因为这些IO Model的区别就是在两个阶段上各有不同的情况。
常见的几种IO 模型:
- blocking IO (阻塞IO)
- nonblocking IO (非阻塞IO)
- IO multiplexing (IO多路复用)
- signal driven IO (信号驱动式IO)
- asynchronous IO (异步IO)
一、不常用的IO模型
1、信号驱动IO模型(Signal-driven IO)
使用信号,让内核在描述符就绪时发送SIGIO信号通知应用程序,称这种模型为信号驱动式I/O(signal-driven I/O)。
原理图:
首先开启套接字的信号驱动式I/O功能,并通过sigaction系统调用安装一个信号处理函数。该系统调用将立即返回,我们的进程继续工作,也就是说进程没有被阻塞。当数据报准备好读取时,内核就为该进程产生一个SIGIO信号。随后就可以在信号处理函数中调用recvfrom读取数据报,并通知主循环数据已经准备好待处理,也可以立即通知主循环,让它读取数据报。
无论如何处理SIGIO信号,这种模型的优势在于等待数据报到达期间进程不被阻塞。主循环可以继续执行 ,只要等到来自信号处理函数的通知:既可以是数据已准备好被处理,也可以是数据报已准备好被读取。
二、常用的四种IO模型:
1、 blocking IO(阻塞IO模型)
原理图:
示例:一收一发程序会进入死循环
server.py
#!/usr/bin/env python # -*- coding:utf-8 -*- #Author: nulige import socket sk=socket.socket() sk.bind(("127.0.0.1",8080)) sk.listen(5) while 1: conn,addr=sk.accept() while 1: conn.send("hello client".encode("utf8")) data=conn.recv(1024) print(data.decode("utf8"))
client.py
#!/usr/bin/env python # -*- coding:utf-8 -*- #Author: nulige import socket sk=socket.socket() sk.connect(("127.0.0.1",8080)) while 1: data=sk.recv(1024) print(data.decode("utf8")) sk.send(b"hello server")
当用户进程调用了recvfrom这个系统调用,kernel就开始了IO的第一个阶段:准备数据。对于network io来说,很多时候数据在一开始还没有到达(比如,还没有收到一个完整的UDP包),这个时候kernel就要等待足够的数据到来。而在用户进程这边,整个进程会被阻塞。当kernel一直等到数据准备好了,它就会将数据从kernel中拷贝到用户内存,然后kernel返回结果,用户进程才解除block的状态,重新运行起来。
所以,blocking IO的特点就是在IO执行的两个阶段都被block了。
2、non-blocking IO(非阻塞IO)
原理图:
从图中可以看出,当用户进程发出read操作时,如果kernel中的数据还没有准备好,那么它并不会block用户进程,而是立刻返回一个error。从用户进程角度讲 ,它发起一个read操作后,并不需要等待,而是马上就得到了一个结果。用户进程判断结果是一个error时,它就知道数据还没有准备好,于是它可以再次发送read操作。一旦kernel中的数据准备好了,并且又再次收到了用户进程的system call,那么它马上就将数据拷贝到了用户内存,然后返回。
所以,用户进程其实是需要不断的主动询问kernel数据好了没有。
注意:
在网络IO时候,非阻塞IO也会进行recvform系统调用,检查数据是否准备好,与阻塞IO不一样,”非阻塞将大的整片时间的阻塞分成N多的小的阻塞, 所以进程不断地有机会 ‘被’ CPU光顾”。即每次recvform系统调用之间,cpu的权限还在进程手中,这段时间是可以做其他事情的,
也就是说非阻塞的recvform系统调用调用之后,进程并没有被阻塞,内核马上返回给进程,如果数据还没准备好,此时会返回一个error。进程在返回之后,可以干点别的事情,然后再发起recvform系统调用。重复上面的过程,循环往复的进行recvform系统调用。这个过程通常被称之为轮询。轮询检查内核数据,直到数据准备好,再拷贝数据到进程,进行数据处理。需要注意,拷贝数据整个过程,进程仍然是属于阻塞的状态。
示例:
服务端:
import time import socket sk = socket.socket(socket.AF_INET,socket.SOCK_STREAM) sk.bind((\'127.0.0.1\',6667)) sk.listen(5) sk.setblocking(False) #设置成非阻塞状态 while True: try: print (\'waiting client connection .......\') connection,address = sk.accept() # 进程主动轮询 print("+++",address) client_messge = connection.recv(1024) print(str(client_messge,\'utf8\')) connection.close() except Exception as e: #捕捉错误 print (e) time.sleep(4) #每4秒打印一个捕捉到的错误
客户端:
import time import socket sk = socket.socket(socket.AF_INET,socket.SOCK_STREAM) while True: sk.connect((\'127.0.0.1\',6667)) print("hello") sk.sendall(bytes("hello","utf8")) time.sleep(2) break
缺点:
1、发送了太多系统调用数据
2、数据处理不及时
3、IO multiplexing(IO多路复用)
IO multiplexing这个词可能有点陌生,但是如果我说select,epoll,大概就都能明白了。有些地方也称这种IO方式为event driven IO。我们都知道,select/epoll的好处就在于单个process就可以同时处理多个网络连接的IO。它的基本原理就是select/epoll这个function会不断的轮询所负责的所有socket,当某个socket有数据到达了,就通知用户进程。
IO多路复用的三种方式:
1、select--->效率最低,但有最大描述符限制,在linux为1024。
2、poll ---->和select一样,但没有最大描述符限制。
3、epoll --->效率最高,没有最大描述符限制,支持水平触发与边缘触发。
IO多路复用的优势:同时可以监听多个连接,用的是单线程,利用空闲时间实现并发。
注意:
Linux系统: select、poll、epoll
Windows系统:select
Mac系统:select、poll
原理图:
当用户进程调用了select,那么整个进程会被block,而同时,kernel会“监视”所有select负责的socket,当任何一个socket中的数据准备好了,select就会返回。这个时候用户进程再调用read操作,将数据从kernel拷贝到用户进程。
这个图和blocking IO的图其实并没有太大的不同,事实上,还更差一些。因为这里需要使用两个system call (select 和 recvfrom),而blocking IO只调用了一个system call (recvfrom)。但是,用select的优势在于它可以同时处理多个connection。(多说一句。所以,如果处理的连接数不是很高的话,使用select/epoll的web server不一定比使用multi-threading + blocking IO的web server性能更好,可能延迟还更大。select/epoll的优势并不是对于单个连接能处理得更快,而是在于能处理更多的连接。)
在IO multiplexing Model中,实际中,对于每一个socket,一般都设置成为non-blocking,但是,如上图所示,整个用户的process其实是一直被block的。只不过process是被select这个函数block,而不是被socket IO给block。
注意1:select函数返回结果中如果有文件可读了,那么进程就可以通过调用accept()或recv()来让kernel将位于内核中准备到的数据copy到用户区。
注意2: select的优势在于可以处理多个连接,不适用于单个连接
示例:
server.py
import socket import select sk=socket.socket() sk.bind(("127.0.0.1",9904)) sk.listen(5) sk1=socket.socket() sk1.bind(("127.0.0.1",8090)) sk1.listen(3) while True: # sk.accept() #文件描述符 r,w,e=select.select([sk,sk1],[],[],5) #输入列表,输出列表,错误列表,5: 是监听5秒 for i in r: #[sk,sk1] 监听客户端的具体端口 conn,add=i.accept() print(conn) print("hello") print(\'>>>>>>\')
client.py
import socket sk=socket.socket() sk.connect(("127.0.0.1",8090)) while 1: inp=input(">>").strip() sk.send(inp.encode("utf8")) data=sk.recv(1024) print(data.decode("utf8"))
1、水平触发
只有高电平或低电平的时候才触发
1-----高电平---触发
0-----低电平---不触发
2、边缘触发
1---------高电平--------触发
0---------低电平--------触发
IO多路复用优势:同时可以监听多个连接
示例:select可以监控多个对象
服务端
import socket import select sk=socket.socket() sk.bind(("127.0.0.1",8801)) sk.listen(5) inputs=[sk,] while True: #监听sk和conn r,w,e=select.select(inputs,[],[],5) #conn发生变化,sk不变化就走else print(len(r)) #判断sk or conn 谁发生了变化 for obj in r: if obj==sk: conn,add=obj.accept() print(conn) inputs.append(conn) else: data_byte=obj.recv(1024) print(str(data_byte,\'utf8\')) inp=input(\'回答%s号客户>>>\'%inputs.index(obj)) obj.sendall(bytes(inp,\'utf8\')) print(\'>>\',r)
客户端:
import socket sk=socket.socket() sk.connect((\'127.0.0.1\',8801)) while True: inp=input(">>>>") sk.sendall(bytes(inp,"utf8")) data=sk.recv(1024) print(str(data,\'utf8\'))
输出结果:
#server >> [<socket.socket fd=276, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=(\'127.0.0.1\', 8801)>] 1 hello 回答1号客户>>>word >> [<socket.socket fd=344, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=(\'127.0.0.1\', 8801), raddr=(\'127.0.0.1\', 54388)>] 1 #clinet >>>>hello word
4、Asynchronous I/O(异步IO)
用户进程发起read操作之后,立刻就可以开始去做其它的事。而另一方面,从kernel的角度,当它受到一个asynchronous read之后,首先它会立刻返回,所以不会对用户进程产生任何block。然后,kernel会等待数据准备完成,然后将数据拷贝到用户内存,当这一切都完成之后,kernel会给用户进程发送一个signal,告诉它read操作完成了。
异步最大特点:全程无阻塞
synchronous IO(同步IO)和asynchronous IO(异步IO)的区别:
- A synchronous I/O operation causes the requesting process to be blocked until that I/O operationcompletes;
- An asynchronous I/O operation does not cause the requesting process to be blocked;
两者的区别就在于synchronous IO做”IO operation”的时候会将process阻塞。(有一丁点阻塞,都是同步IO)按照这个定义,之前所述的blocking IO,non-blocking IO,IO multiplexing都属于synchronous IO(同步IO)。
同步IO:包括 blocking IO、non-blocking、select、poll、epoll(故:epool只是伪异步而已)(有阻塞)
异步IO:包括:asynchronous (无阻塞)
5、selectors模块应用
python封装好的模块:selectors
selectors模块: 会选择一个最优的操作系统实现方式
示例:
select_module.py
import selectors import socket sel = selectors.DefaultSelector() def accept(sock, mask): conn, addr = sock.accept() # Should be ready print(\'accepted\', conn, \'from\', addr) conn.setblocking(False) #设置成非阻塞 sel.register(conn, selectors.EVENT_READ, read) #conn绑定的是read def read(conn, mask): try: data = conn.recv(1000) # Should be ready if not data: raise Exception print(\'echoing\', repr(data), \'to\', conn) conn.send(data) # Hope it won\'t block except Exception as e: print(\'closing\', conn) sel.unregister(conn) #解除注册 conn.close() sock = socket.socket() sock.bind((\'localhost\', 8090)) sock.listen(100) sock.setblocking(False) #注册 sel.register(sock, selectors.EVENT_READ, accept) print("server....") while True: events = sel.select() #监听[sock,conn1,conn2] print("events",events) #拿到2个元素,一个key,一个mask for key, mask in events: # print("key",key) # print("mask",mask) callback = key.data #绑定的是read函数 # print("callback",callback) callback(key.fileobj, mask) #key.fileobj=sock,conn1,conn2
client.py
import socket sk=socket.socket() sk.connect(("127.0.0.1",8090)) while 1: inp=input(">>>") sk.send(inp.encode("utf8")) #发送内容 data=sk.recv(1024) #接收信息 print(data.decode("utf8")) #打印出来
输出结果:
#server server.... events [(SelectorKey(fileobj=<socket.socket fd=312, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=(\'127.0.0.1\', 8090)>, fd=312, events=1, data=<function accept at 0x01512F60>), 1)] accepted <socket.socket fd=376, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=(\'127.0.0.1\', 8090), raddr=(\'127.0.0.1\', 57638)> from (\'127.0.0.1\', 57638) events [(SelectorKey(fileobj=<socket.socket fd=376, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=(\'127.0.0.1\', 8090), raddr=(\'127.0.0.1\', 57638)>, fd=376, events=1, data=<function read at 0x015C26A8>), 1)] echoing b\'hello\' to <socket.socket fd=376, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=(\'127.0.0.1\', 8090), raddr=(\'127.0.0.1\', 57638)> events [(SelectorKey(fileobj=<socket.socket fd=312, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=(\'127.0.0.1\', 8090)>, fd=312, events=1, data=<function accept at 0x01512F60>), 1)] accepted <socket.socket fd=324, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=(\'127.0.0.1\', 8090), raddr=(\'127.0.0.1\', 57675)> from (\'127.0.0.1\', 57675) events [(SelectorKey(fileobj=<socket.socket fd=324, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=(\'127.0.0.1\', 8090), raddr=(\'127.0.0.1\', 57675)>, fd=324, events=1, data=<function read at 0x015C26A8>), 1)] echoing b\'uuuu\' to <socket.socket fd=324, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=(\'127.0.0.1\', 8090), raddr=(\'127.0.0.1\', 57675)> events [(SelectorKey(fileobj=<socket.socket fd=324, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=(\'127.0.0.1\', 8090), raddr=(\'127.0.0.1\', 57675)>, fd=324, events=1, data=<function read at 0x015C26A8>), 1)] closing <socket.socket fd=324, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=(\'127.0.0.1\', 8090), raddr=(\'127.0.0.1\', 57675)> events [(SelectorKey(fileobj=<socket.socket fd=312, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=(\'127.0.0.1\', 8090)>, fd=312, events=1, data=<function accept at 0x01512F60>), 1)] accepted <socket.socket fd=324, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=(\'127.0.0.1\', 8090), raddr=(\'127.0.0.1\', 57876)> from (\'127.0.0.1\', 57876) events [(SelectorKey(fileobj=<socket.socket fd=324, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=(\'127.0.0.1\', 8090), raddr=(\'127.0.0.1\', 57876)>, fd=324, events=1, data=<function read at 0x015C26A8>), 1)] echoing b\'welcome\' to <socket.socket fd=324, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=(\'127.0.0.1\', 8090), raddr=(\'127.0.0.1\', 57876)> #clinet (启动两个client) >>>hello hello >>>welcome welcome
以上是关于Python并发编程-事件驱动模型的主要内容,如果未能解决你的问题,请参考以下文章