Python并发编程基础 №⑧ 并发完结篇:IO模型

Posted 四方游览

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python并发编程基础 №⑧ 并发完结篇:IO模型相关的知识,希望对你有一定的参考价值。

1、详细介绍

  

为了更好地了解IO模型,我们需要事先回顾下:同步、异步、阻塞、非阻塞

同步(synchronous):就是一个任务的完成需要依赖另外一个任务时,只有等待被依赖的任务完成后,依赖的任务才能算完成,
这是一种可靠的任务序列。要么成功都成功,失败都失败,两个任务的状态可以保持一致。

异步(asynchronous):是不需要等待被依赖的任务完成,只是通知被依赖的任务要完成什么工作,依赖的任务也立即执行,
只要自己完成了整个任务就算完成了。至于被依赖的任务最终是否真正完成,依赖它的任务无法确定,所以它是不可靠的任务序列。

阻塞和非阻塞这两个概念与程序(线程)等待消息通知(无所谓同步或者异步)
时的状态有关。也就是说阻塞与非阻塞主要是程序(线程)等待消息通知时的状态角度来说的。

简言之:
同步 提交一个任务之后要等待这个任务执行完毕
异步 只管提交任务,不等待这个任务执行完毕就可以做其他事情
阻塞 recv recvfrom accept
非阻塞

Richard Stevens 提出的在Linux环境下的network IO的五种模型:
1: blocking IO 阻塞IO
2: nonblocking IO 非阻塞IO
3: IO multiplexing IO多路复用
4. signal driven IO 信号驱动IO
5: asynchronous IO 异步IO

再说一下IO发生时涉及的对象和步骤。对于一个networkO(这里我们以read举例),它会涉及到两个系统对象,
一个是调用这个IO的process( or thread),
另一个就是系统内核(kernel)。当一个read操作发生时,该操作会经历两个阶段:

1)等待数据准备 (Waiting for the data to be ready)
2)将数据从内核拷贝到进程中(Copying the data from the kernel to the process)


接下来举例说明常用的前三种模型
阻塞IO,经常遇到。在linux中,默认情况下所有的socket都是blocking,一个典型的读操作流程大概是这样:
1)用户进程,调用了recvfrom这个系统方法,kernel就开始了IO的第一个阶段:准备数据;
对于network io来说,很多时候数据在一开始还没有到达(比如,还没有收到一个完整的UDP包),这个时候kernel就要等待足够的数据到来。
2) 系统进程:先准备数据,然后拷贝数据,最后将结果返回给用户进程。
而在用户进程这边,整个进程会被阻塞。当kernel一直等到数据准备好了,它就会将数据从kernel中拷贝到用户内存,然后kernel返回结果,
用户进程才解除block的状态,重新运行起来。

所以,blocking IO的特点就是在IO执行的两个阶段(等待数据和拷贝数据两个阶段)都被block了。

实际上,除非特别指定,几乎所有的IO接口 ( 包括socket接口 ) 都是阻塞型的。这给网络编程带来了一个很大的问题,
如在调用recv(1024)的同时,线程将被阻塞,在此期间,线程将无法执行任何运算或响应任何的网络请求。

一个简单的解决方案
  在服务器端使用多线程(或多进程)。多线程(或多进程)的目的是让每个连接都拥有独立的线程(或进程),
这样任何一个连接的阻塞都不会影响其他的连接。

该方案的问题是:
  开启多进程或都线程的方式,在遇到要同时响应成百上千路的连接请求,则无论多线程还是多进程都会严重占据系统资源,
降低系统对外界响应效率,而且线程与进程本身也更容易进入假死状态。

改进方案
很多程序员可能会考虑使用“线程池”或“连接池”。“线程池”旨在减少创建和销毁线程的频率,其维持一定合理数量的线程,
并让空闲的线程重新承担新的执行任务。“连接池”维持连接的缓存池,尽量重用已有的连接、减少创建和关闭连接的频率。
这两种技术都可以很好的降低系统开销,都被广泛应用很多大型系统,如websphere、tomcat和各种数据库等。

改进后方案其实也存在着问题:

“线程池”和“连接池”技术也只是在一定程度上缓解了频繁调用IO接口带来的资源占用。而且,所谓“池”始终有其上限,
当请求大大超过上限时,“池”构成的系统对外界的响应并不比没有池的时候效果好多少。所以使用“池”必须考虑其面临的响应规模,
并根据响应规模调整“池”的大小。

对应上例中的所面临的可能同时出现的上千甚至上万次的客户端请求,“线程池”或“连接池”或许可以缓解部分压力,但是
不能解决所有问题。总之,多线程模型可以方便高效的解决小规模的服务请求,但面对大规模的服务请求,多线程模型也会遇到瓶颈,
可以用非阻塞接口来尝试解决这个问题。


2_非阻塞IO(noneblocking IO)

  linux下,可以通过设置socket使其变为non-blocking。当对一个non-blocking 执行读操作时,流程是这个样子:

 

 

    从图中可以看出,当用户进程发出read操作时,如果kernel中的数据还没有准备好,那么它并不会block用户进程,而是立刻返回一
个error。从用户进程角度讲 ,它发起一个read操作后,并不需要等待,而是马上就得到了一个结果。用户进程判断结果是一个error时,
它就知道数据还没有准备好,于是用户就可以在本次到下次再发起read询问的时间间隔内做其他事情,或者直接再次发送read操作。

一旦kernel中的数据准备好了,并且又再次收到了用户进程的system call,那么它马上就将数据拷贝到了用户内存(这一阶段仍然是
阻塞的),然后返回。也就是说非阻塞的recvform系统调用调用之后,进程并没有被阻塞,内核马上返回给进程,如果数据还没准备好,
此时会返回一个error。进程在返回之后,可以干点别的事情,然后再发起recvform系统调用。重复上面的过程,循环往复的进行recvfrom
系统调用。这个过程通常被称之为轮询。轮询检查内核数据,直到数据准备好,再拷贝数据到进程,进行数据处理。需要注意,拷贝数据
整个过程,进程仍然是属于阻塞的状态。\'\'\'
所以,在非阻塞式IO中,用户进程其实是需要不断的主动询问kernel数据准备好了没有。

实例

 1 import socket
 2 
 3 sk = socket.socket()
 4 sk.bind((\'127.0.0.1\', 8888))
 5 sk.listen(5)
 6 sk.setblocking(False)
 7 
 8 conn_list = []
 9 del_list = []
10 while 1:
11 
12     try:
13         conn, addr = sk.accept()  # 此处不再阻塞
14         conn_list.append(conn)
15 
16     except BlockingIOError:
17         if conn_list:
18             print(conn_list)
19 
20         for conn in conn_list:
21             try:
22                 msg = conn.recv(1024).decode(\'utf-8\') # 此处不再阻塞
23                 if not msg:
24                     del_list.append(conn)
25                     continue
26                 conn.send(b\'hi\')
27             except BlockingIOError:
28                 pass
29             except ConnectionResetError:
30                 del_list.append(conn)
31 
32         for c in del_list:
33             conn_list.remove(c)
34             c.close()
35 
36         del_list.clear()
37 
38     # sk.close()
View Code
但是非阻塞IO模型绝不被推荐。
我们不能否则其优点:能够在等待任务完成的时间里干其他活了(包括提交其他任务,也就是 “后台” 可以有多个任务在“”同时“”执行)。
但是也难掩其缺点:
1. 循环调用recv()将大幅度推高CPU占用率;这也是我们在代码中留一句time.sleep(2)的原因,否则在低配主机下极容易出现卡机情况
2. 任务完成的响应延迟增大了,因为每过一段时间才去轮询一次read操作,而任务可能在两次轮询之间的任意时间完成。这会导致整体数据吞吐量的降低。

此外,在这个方案中recv()更多的是起到检测“操作是否完成”的作用,实际操作系统提供了更为高效的检测“操作是否完成“作用的接口,
例如select()多路复用模式,可以一次检测多个连接是否活跃。

实现IO复用中的三个API(select、poll和epoll)的区别和联系
select,poll,epoll都是IO多路复用的机制,I/O多路复用就是通过一种机制,可以监视多个描述符,一旦某个描述符就绪(一般是读
就绪或者写就绪),能够通知应用程序进行相应的读写操作。但select,poll,epoll本质上都是同步I/O,因为他们都需要在读写事件
就绪后自己负责进行读写,也就是说这个读写过程是阻塞的,而异步I/O则无需自己负责进行读写,异步I/O的实现会负责把数据从内核拷贝
到用户空间。三者的原型如下所示:
1 int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
2 
3 int poll(struct pollfd *fds, nfds_t nfds, int timeout);
4 
5 int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
这三种IO多路复用模型在不同的平台有着不同的支持,而epoll在windows下就不支持,好在我们有selectors模块,帮我们默认选择当前平台下最合适的
 1 # 服务器端
 2 
 3 import selectors
 4 from socket import *
 5 
 6 
 7 def accept(sk, mask):
 8     conn, addr = sk.accept()
 9     sel.register(conn, selectors.EVENT_READ, read)
10 
11 
12 def read(conn, mask):
13     try:
14         msg = conn.recv(1024)
15         if not msg:
16             print(\'closing connection:\', conn)
17             sel.unregister(conn)
18             conn.close()
19             return
20         conn.send(b\'good bye\')
21     except Exception:
22             print(\'closing connection:\', conn)
23             sel.unregister(conn)
24             conn.close()
25 
26 sk = socket()
27 sk.bind((\'127.0.0.1\', 8888))
28 sk.listen(5)
29 
30 # #设置socket的接口为非阻塞
31 sk.setblocking(False)
32 # 选择一个适合我的IO多路复用的机制
33 sel = selectors.DefaultSelector()
34 
35 sel.register(sk, selectors.EVENT_READ, accept)
36 \'\'\'相当于往select的读列表里append了一个sk对象,并且绑定了一个回调函数accept
37 说白了就是 如果有人请求连接sk,就调用accept方法\'\'\'
38 
39 while 1:
40     events = sel.select() # 检测所有的sk,conn,是否有完成wait data阶段
41     for sel_obj, mask in events:
42         callback = sel_obj.data  # callback=read
43         callback(sel_obj.fileobj, mask)  # read(sk, mask)
44 
45 
46 # 与前例公用的客户端
47 import socket
48 
49 sk = socket.socket()
50 sk.connect((\'127.0.0.1\', 8888))
51 
52 try:
53     while 1:
54         sk.send(b\'hello\')
55         print(sk.recv(1024).decode(\'utf-8\'))
56         ipt = input(\'>>>\').encode(\'utf-8\')
57         sk.send(ipt)
58 except:
59     sk.close()
View Code

 

3_多路复用IO(IO multiplexing)

 

 

 

 1 # 服务器端
 2 
 3 import socket
 4 import select
 5 
 6 sk = socket.socket()
 7 sk.bind((\'127.0.0.1\', 8888))
 8 sk.listen(5)
 9 
10 sk.setblocking(False)
11 
12 read_list = [sk]
13 
14 while 1:
15 
16     r_lst, w_lst, x_lst = select.select(read_list, [], [])
17 
18     for r in r_lst:
19         if r is sk:
20             conn, addr = r.accept()
21             read_list.append(conn)
22         else:
23             ret = r.recv(1024).decode(\'utf-8\')
24             if ret == \'\':
25                 r.close()
26                 read_list.remove(r)
27                 continue
28             print(ret)
29             r.send(b\'goodbye\')
30 
31 # 客户端
32 
33 import socket
34 
35 sk = socket.socket()
36 sk.connect((\'127.0.0.1\', 8888))
37 
38 try:
39     while 1:
40         sk.send(b\'hello\')
41         print(sk.recv(1024).decode(\'utf-8\'))
42         ipt = input(\'>>>\').encode(\'utf-8\')
43         sk.send(ipt)
44 except:
45     sk.close()

 

 
 

以上是关于Python并发编程基础 №⑧ 并发完结篇:IO模型的主要内容,如果未能解决你的问题,请参考以下文章

透彻学习Python异步编程——模块asyncio之基础篇

python 并发编程 协程 greenlet模块

Python并发编程之初识异步IO框架:asyncio 上篇

python语法基础-并发编程-协程-长期维护

Python爬虫速度很慢?并发编程了解一下吧

python网络编程基础(线程与进程并行与并发同步与异步)