IO并发
Posted -xiaolong
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了IO并发相关的知识,希望对你有一定的参考价值。
IO分类
IO分类:阻塞IO,非阻塞IO,IO多路复用,异步IO等
阻塞IO
- 定义:在执行IO操作时如果执行条件不满足则阻塞。阻塞IO是IO的默认形态。
- 效率:阻塞IO是效率很低的一种IO。但是由于逻辑简单所以是默认IO行为。
- 阻塞情况:
- 因为某种执行条件没有满足造成的函数阻塞
如:accept input recv等
- 处理IO的时间较长产生的阻塞状态
如:网络传输,大文件读写等
非阻塞IO
- 定义:通过修改IO属性行为,使原本阻塞的IO变为非阻塞的状态
- 设置套接字为非阻塞IO
sockfd.setblocking(bool)
功能:设置套接字为非阻塞IO
参数:默认为True,表示套接字IO阻塞;设置为False则套接字IO变为非阻塞
- 超时检测:设置一个最长阻塞时间,超过该时间后则不再阻塞等待。
sockfd.settimeout(sec)
功能:设置套接字的超时时间
参数:设置的时间
代码示例:
1 from socket import * 2 from time import * 3 4 # 日志文件 5 f = open(‘log.txt‘,‘a+‘) 6 7 # tcp 服务端 8 sockfd = socket() 9 sockfd.bind((‘0.0.0.0‘,8888)) 10 sockfd.listen(5) 11 12 # 非阻塞设置 13 # sockfd.setblocking(False) 14 15 # 超时时间 16 sockfd.settimeout(2) 17 18 19 while True: 20 print("Waiting from connect...") 21 try: 22 connfd,addr = sockfd.accept() 23 except (BlockingIOError,timeout) as e: 24 sleep(2) 25 f.write("%s : %s\n"%(ctime(),e)) 26 f.flush() 27 else: 28 print("Connect from",addr) 29 data = connfd.recv(1024).decode() 30 print(data)
IO多路复用
- 定义
同时监控多个IO事件,当那个IO事件准备就绪就执行哪个IO事件。以此形成可以同时处理多个IO的行为,避免一个IO阻塞造成其他IO均无法执行,提高了IO执行效率。
- 具体方案
select方法:Windows Linux unix
poll方法:Linux unix
epoll方法:Linux
select方法
rs, ws, xs=select(rlist, wlist, xlist[, timeout]) 功能: 监控IO事件,阻塞等待IO发生 参数:rlist 列表 存放关注的等待发生的IO事件 wlist 列表 存放关注的要主动处理的IO事件 xlist 列表 存放关注的出现异常要处理的IO timeout 超时时间 返回值: rs 列表 rlist中准备就绪的IO ws 列表 wlist中准备就绪的IO xs 列表 xlist中准备就绪的IO
代码示例:
1 from select import select 2 from socket import * 3 4 s = socket() 5 s.bind((‘0.0.0.0‘,8888)) 6 s.listen(3) 7 8 f = open(‘log.txt‘,‘r+‘) 9 10 print("开始监控IO") 11 rs,ws,xs = select([s],[],[]) 12 print(rs) 13 print(ws) 14 print(xs)
select实现TCP服务
【1】 将关注的IO放入对应的监控类别列表
【2】通过select函数进行监控
【3】遍历select返回值列表,确定就绪IO事件
【4】处理发生的IO事件
注意
- wlist中如果存在IO事件,则select立即返回给ws
- 处理IO过程中不要出现死循环占有服务端的情况
- IO多路复用消耗资源较少,效率较高
select tcp代码示例:
1 from socket import * 2 from select import select 3 4 # 创建监听套接字 5 s = socket() 6 s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) 7 s.bind((‘0.0.0.0‘,8888)) 8 s.listen(5) 9 10 # 设置关注的IO列表 11 rlist = [s] # s 用于等待处理连接 12 wlist = [] 13 xlist = [] 14 15 # 循环IO监控 16 while True: 17 # print("++++",rlist) 18 rs,ws,xs = select(rlist,wlist,xlist) 19 # print(‘----‘,rs) 20 # 遍历返回值列表,判断哪个IO就绪 21 for r in rs: 22 if r is s: 23 c,addr = r.accept() 24 print("Connect from",addr) 25 rlist.append(c) # 增加新的关注的IO 26 else: 27 # 表明有客户端发送消息 28 data = r.recv(1024).decode() 29 print(data) 30 r.send(b‘OK‘) 31 32 for w in ws: 33 pass 34 35 for x in xs: 36 pass
poll方法
p = select.poll() 功能 : 创建poll对象 返回值: poll对象 p.register(fd,event) 功能: 注册关注的IO事件 参数:fd 要关注的IO event 要关注的IO事件类型 常用类型:POLLIN 读IO事件(rlist) POLLOUT 写IO事件 (wlist) POLLERR 异常IO (xlist) POLLHUP 断开连接 e.g. p.register(sockfd,POLLIN|POLLERR) p.unregister(fd) 功能:取消对IO的关注 参数:IO对象或者IO对象的fileno events = p.poll() 功能: 阻塞等待监控的IO事件发生 返回值: 返回发生的IO events格式 [(fileno,event),()....] 每个元组为一个就绪IO,元组第一项是该IO的fileno,第二项为该IO就绪的事件类型
poll步骤
【1】 创建套接字
【2】 将套接字register
【3】 创建查找字典,并维护
【4】 循环监控IO发生
【5】 处理发生的IO
poll代码示例:
1 from socket import * 2 from select import * 3 4 # 创建监听套接字,作为关注的IO 5 s = socket() 6 s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) 7 s.bind((‘0.0.0.0‘,8888)) 8 s.listen(3) 9 10 # 创建poll对象 11 p = poll() 12 13 # 建立查找字典,通过IO的fileno查找io对象 14 # 始终与register的IO保持一直 15 fdmap = s.fileno():s 16 17 # 关注 s 18 p.register(s,POLLIN|POLLERR) 19 20 # 循环监控IO发生 21 while True: 22 events = p.poll() # 阻塞等待IO发生 23 # 循环遍历查看哪个IO准备就绪 24 for fd,event in events: 25 print(events) 26 if fd == s.fileno(): 27 c,addr = fdmap[fd].accept() 28 print("Connect from",addr) 29 # 关注客户端连接套接字 30 p.register(c,POLLIN|POLLHUP) 31 fdmap[c.fileno()] = c # 维护字典 32 elif event & POLLIN: 33 data = fdmap[fd].recv(1024).decode() 34 if not data: 35 p.unregister(fd) # 取消监控 36 fdmap[fd].close() 37 del fdmap[fd] # 从字典删除 38 continue 39 print(data) 40 p.register(fdmap[fd],POLLOUT) 41 elif event & POLLOUT: 42 fdmap[fd].send(b‘OK‘) 43 p.register(fdmap[fd], POLLIN)
epoll方法
使用方法:
- 基本与poll相同
- 生成对象改为epoll()
- 将所有事件类型改为EPOLL类型
epoll特点
- epoll效率比select poll要高
- epoll监控IO数量方式比poll要多
- epoll的触发方式比poll要多(EPOLLET边缘触发)
epoll完成tcp并发代码示例:
1 from socket import * 2 from select import * 3 4 # 创建监听套接字,作为关注的IO 5 s = socket() 6 s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) 7 s.bind((‘0.0.0.0‘,8888)) 8 s.listen(3) 9 10 # 创建epoll对象 11 ep = epoll() 12 13 # 建立查找字典,通过IO的fileno查找io对象 14 # 始终与register的IO保持一直 15 fdmap = s.fileno():s 16 17 # 关注 s 18 ep.register(s,EPOLLIN|EPOLLERR) 19 20 # 循环监控IO发生 21 while True: 22 events = ep.poll() # 阻塞等待IO发生 23 print("你有新的IO需要处理哦") 24 # 循环遍历查看哪个IO准备就绪 25 for fd,event in events: 26 print(events) 27 if fd == s.fileno(): 28 c,addr = fdmap[fd].accept() 29 print("Connect from",addr) 30 # 关注客户端连接套接字 31 ep.register(c,EPOLLIN|EPOLLET) # 设置边缘触发 32 fdmap[c.fileno()] = c # 维护字典 33 # elif event & EPOLLIN: 34 # data = fdmap[fd].recv(1024).decode() 35 # if not data: 36 # ep.unregister(fd) # 取消监控 37 # fdmap[fd].close() 38 # del fdmap[fd] # 从字典删除 39 # continue 40 # print(data) 41 # ep.unregister(fd) # 先取消关注再重新添加 42 # ep.register(fdmap[fd], EPOLLOUT) 43 # elif event & POLLOUT: 44 # fdmap[fd].send(b‘OK‘) 45 # ep.unregister(fd) 46 # ep.register(fdmap[fd], EPOLLIN)
协程技术
基础概念
- 定义:纤程,微线程。是允许在不同入口点不同位置暂停的计算机程序,简单来说,协程就是可以暂停执行的函数。
- 协程原理:记录一个函数的上下文,协程调度切换时会将记录的上下文保存,在切换回来时进行调取,恢复原有的执行内容,以便从上一次执行位置继续执行
- 协程优缺点:
优点:
- 协程完成多任务占用计算资源很少
- 由于协程的多任务切换在应用层完成,因此切换开销少
- 协程为单线程程序,无需进行共享资源同步互斥处理
缺点:
- 协程的本质是一个单线程,无法利用计算机多核资源
扩展延申@标准库协程的实现
python3.5以后,使用标准库asyncio和async/await 语法来编写并发代码。asyncio库通过对异步IO行为的支持完成python的协程。虽然官方说asyncio是未来的开发方向,但是由于其生态不够丰富,大量的客户端不支持awaitable需要自己去封装,所以在使用上存在缺陷。更多时候只能使用已有的异步库(asyncio等),功能有限。
第三方协程模块
greenlet模块
- 安装:sudo pip3 install greenlet
- 函数
greenlet.greenlet(func) 功能:创建协程对象 参数:协程函数 g.switch() 功能:选择要执行的协程函数
协程行为代码示例:
1 from greenlet import greenlet 2 3 def fun1(): 4 print("执行 fun1") 5 gr2.switch() 6 print("结束 fun1") 7 gr2.switch() 8 9 def fun2(): 10 print("执行 fun2") 11 gr1.switch() 12 print("结束 fun2") 13 14 # 将函数变为协程 15 gr1 = greenlet(fun1) 16 gr2 = greenlet(fun2) 17 18 gr1.switch() # 选择执行的协程函数
gevent模块
安装:sudo pip3 install gevent
函数:
gevent.spawn(func,argv) 功能: 生成协程对象 参数:func 协程函数 argv 给协程函数传参(不定参) 返回值: 协程对象 gevent.joinall(list,[timeout]) 功能: 阻塞等待协程执行完毕 参数:list 协程对象列表 timeout 超时时间 gevent.sleep(sec) 功能: gevent睡眠阻塞 参数:睡眠时间 * gevent协程只有在遇到gevent指定的阻塞行为时才会自动在协程之间进行跳转 如gevent.joinall(),gevent.sleep()带来的阻塞
gevent生成协程示例:
1 import gevent 2 from gevent import monkey 3 monkey.patch_time() # 修改对time模块中阻塞的解释行为 4 from time import sleep 5 6 # 协程函数 7 def foo(a,b): 8 print("Running foo ...",a,b) 9 # gevent.sleep(3) 10 sleep(3) 11 print("Foo again..") 12 13 def bar(): 14 print("Running bar ...") 15 # gevent.sleep(2) 16 sleep(2) 17 print("Bar again..") 18 19 # 生成协程对象 20 f = gevent.spawn(foo,1,2) 21 g = gevent.spawn(bar) 22 23 gevent.joinall([f,g]) #阻塞等待f,g代表的协程执行完毕
基于协程的TCP开发代码示例:
1 import gevent 2 from gevent import monkey 3 monkey.patch_all() # 执行脚本,修改socket 4 from socket import * 5 6 def handle(c): 7 while True: 8 data = c.recv(1024).decode() 9 if not data: 10 break 11 print(data) 12 c.send(b‘OK‘) 13 c.close() 14 15 # 创建tcp套接字 16 s = socket() 17 s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) 18 s.bind((‘0.0.0.0‘,8888)) 19 s.listen(5) 20 21 # 循环接收来自客户端连接 22 while True: 23 c,addr = s.accept() 24 print("Connect from",addr) 25 # handle(c) # 处理具体客户端请求 26 gevent.spawn(handle,c) # 协程方案
- monkey脚本
作用:在gevent协程中,协程只有遇到gevent指定类型的阻塞才能跳转到其他协程,因此,我们希望将普通IO阻塞行为转换为可以触发gevent协程跳转的阻塞,以提高执行效率。
转换方法:gevent提供了一个脚本程序monkey,可以修改底层解释IO阻塞的行为,将很多普通阻塞转换为gevent阻塞。
使用方法
【1】 导入monkey from gevent import monkey 【2】 运行相应的脚本,例如转换socket中所有阻塞 monkey.patch_socket() 【3】 如果将所有可转换的IO阻塞全部转换则运行all monkey.patch_all() 【4】 注意:脚本运行函数需要在对应模块导入前执行
基于协程的TCP开发代码示例:
1 """ 2 思路 : 1. 每个客户函数端设置为协成 3 2. 将socket模块下的阻塞变为可以触发协程跳转 4 """ 5 import gevent 6 from gevent import monkey 7 monkey.patch_all() # 执行脚本,修改socket 8 from socket import * 9 10 def handle(c): 11 while True: 12 data = c.recv(1024).decode() 13 if not data: 14 break 15 print(data) 16 c.send(b‘OK‘) 17 c.close() 18 19 # 创建tcp套接字 20 s = socket() 21 s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1) 22 s.bind((‘0.0.0.0‘,8888)) 23 s.listen(5) 24 25 # 循环接收来自客户端连接 26 while True: 27 c,addr = s.accept() 28 print("Connect from",addr) 29 # handle(c) # 处理具体客户端请求 30 gevent.spawn(handle,c) # 协程方案
扩展:位运算
定义:将整数转换为二进制,按二进制位进行运算
运算符号:
& 按位与 | 按位或 ^ 按位异或 << 左移 >> 右移
14 --> 01110 19 --> 10011 14 & 19 = 00010 = 2 一0则0 14 | 19 = 11111 = 31 一1则1 14 ^ 19 = 11101 = 29 相同为0不同为1 14 << 2 = 111000 = 56 向左移动低位补0 14 >> 2 = 11 = 3 向右移动去掉低位
以上是关于IO并发的主要内容,如果未能解决你的问题,请参考以下文章