io多路复用
Posted qiaodongdong
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了io多路复用相关的知识,希望对你有一定的参考价值。
上篇回顾:静态服务器+压测
3.2.概念篇
1.同步与异步
同步是指一个任务的完成需要依赖另外一个任务时,只有等待被依赖的任务完成后,依赖的任务才能算完成。
异步是指不需要等待被依赖的任务完成,只是通知被依赖的任务要完成什么工作。然后继续执行下面代码逻辑,只要自己完成了整个任务就算完成了(异步一般使用状态、通知和回调)
PS:项目里面一般是这样的:(个人经验)
- 同步架构:一般都是和钱相关的需求,需要实时返回的业务
- 异步架构:更多是对写要求比较高时的场景(同步变异步)
- 读一般都是实时返回,代码一般都是
await xxx()
- 读一般都是实时返回,代码一般都是
- 想象个情景就清楚了:
- 异步:现在用户写了篇文章,可以异步操作,就算没真正写到数据库也可以返回:发表成功(大不了失败提示一下)
- 同步:用户获取订单信息,你如果异步就会这样了:提示下获取成功,然后一片空白...用户不卸载就怪了...
2.阻塞与非阻塞
阻塞是指调用结果返回之前,当前线程会被挂起,一直处于等待消息通知,不能够执行其他业务(大部分代码都是这样的)
非阻塞是指在不能立刻得到结果之前,该函数不会阻塞当前线程,而会立刻返回(继续执行下面代码,或者重试机制走起)
PS:项目里面重试机制为啥一般都是3次?
- 第一次重试,两台PC挂了也是有可能的
- 第二次重试,负载均衡分配的三台机器同时挂的可能性不是很大,这时候就有可能是网络有点拥堵了
- 最后一次重试,再失败就没意义了,日记写起来,再重试网络负担就加大了,得不偿失了
3.五种IO模型
对于一次IO访问,数据会先被拷贝到内核的缓冲区中,然后才会从内核的缓冲区拷贝到应用程序的地址空间。需要经历两个阶段:
- 准备数据
- 将数据从内核缓冲区拷贝到进程地址空间
由于存在这两个阶段,Linux产生了下面五种IO模型(以socket为例
)
- 阻塞式IO:
- 当用户进程调用了
recvfrom
等阻塞方法时,内核进入IO的第1个阶段:准备数据(内核需要等待足够的数据再拷贝)这个过程需要等待,用户进程会被阻塞,等内核将数据准备好,然后拷贝到用户地址空间,内核返回结果,用户进程才从阻塞态进入就绪态 - Linux中默认情况下所有的socket都是阻塞的
- 当用户进程调用了
- 非阻塞式IO:
- 当用户进程发出read操作时,如果
kernel
中的数据还没有准备好,那么它并不会block
用户进程,而是立刻返回一个error
。 - 用户进程判断结果是一个
error
时,它就知道数据还没有准备好,于是它可以再次发送read操作 - 一旦
kernel
中的数据准备好了,并且又再次收到了用户进程的system call
,那么它马上就将数据拷贝到了用户内存,然后返回 - 非阻塞IO模式下用户进程需要不断地询问内核的数据准备好了没有
- 当用户进程发出read操作时,如果
- IO多路复用:
- 通过一种机制,一个进程可以监视多个文件描述符(套接字描述符)一旦某个文件描述符就绪(一般是读就绪或者写就绪),能够通知程序进行相应的读写操作(这样就不需要每个用户进程不断的询问内核数据准备好了没)
- 常用的IO多路复用方式有
select
、poll
和epoll
- 信号驱动IO:
- 内核文件描述符就绪后,通过信号通知用户进程,用户进程再通过系统调用读取数据。
- 此方式属于同步IO(实际读取数据到用户进程缓存的工作仍然是由用户进程自己负责的)
- 异步IO(
POSIX
的aio_
系列函数)- 用户进程发起read操作之后,立刻就可以开始去做其它的事。内核收到一个异步
IO read
之后,会立刻返回,不会阻塞用户进程。 - 内核会等待数据准备完成,然后将数据拷贝到用户内存,当这一切都完成之后,内核会给用户进程发送一个
signal
告诉它read操作完成了
- 用户进程发起read操作之后,立刻就可以开始去做其它的事。内核收到一个异步
4.Unix图示
贴一下Unix编程里面的图:
3.3.IO多路复用
开始之前咱们通过非阻塞IO引入一下:(来个简单例子socket.setblocking(False)
)
import time
import socket
def select(socket_addr_list):
for client_socket, client_addr in socket_addr_list:
try:
data = client_socket.recv(2048)
if data:
print(f"[来自{client_addr}的消息:]
")
print(data.decode("utf-8"))
client_socket.send(
b"HTTP/1.1 200 ok
Content-Type: text/html;charset=utf-8
<h1>Web Server Test</h1>"
)
else:
# 没有消息是触发异常,空消息是断开连接
client_socket.close() # 关闭客户端连接
socket_addr_list.remove((client_socket, client_addr))
print(f"[客户端{client_addr}已断开连接,当前连接数:{len(socket_addr_list)}]")
except Exception:
pass
def main():
# 存放客户端集合
socket_addr_list = list()
with socket.socket() as tcp_server:
# 防止端口绑定的设置
tcp_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
tcp_server.bind((‘‘, 8080))
tcp_server.listen()
tcp_server.setblocking(False) # 服务端非阻塞
while True:
try:
client_socket, client_addr = tcp_server.accept()
client_socket.setblocking(False) # 客户端非阻塞
socket_addr_list.append((client_socket, client_addr))
except Exception:
pass
else:
print(f"[来自{client_addr}的连接,当前连接数:{len(socket_addr_list)}]")
# 防止客户端断开后出错
if socket_addr_list:
# 轮询查看客户端有没有消息
select(socket_addr_list) # 引用传参
time.sleep(0.01)
if __name__ == "__main__":
main()
输出:
可以思考下:
- 为什么Server也要设置为非阻塞?
- PS:一个线程里面只能有一个死循环,现在程序需要两个死循环,so ==> 放一起咯
- 断开连接怎么判断?
- PS:没有消息是触发异常,空消息是断开连接
- client_socket为什么不用dict存放?
- PS:dict在循环的过程中,del会引发异常
1.Select
select和上面的有点类似,就是轮询的过程交给了操作系统:
kernel会“监视”所有select负责的socket,当任何一个socket中的数据准备好了,select就会返回。这个时候用户进程再调用read操作,将数据从kernel拷贝到用户进程
来个和上面等同的案例:
import select
import socket
def main():
with socket.socket() as tcp_server:
tcp_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
tcp_server.bind((‘‘, 8080))
tcp_server.listen()
socket_info_dict = dict()
socket_list = [tcp_server] # 监测列表
while True:
# 劣势:select列表数量有限制
read_list, write_list, error_list = select.select(
socket_list, [], [])
for item in read_list:
# 服务端迎接新的连接
if item == tcp_server:
client_socket, client_address = item.accept()
socket_list.append(client_socket)
socket_info_dict[client_socket] = client_address
print(f"[{client_address}已连接,当前连接数:{len(socket_list)-1}]")
# 客户端发来
else:
data = item.recv(2048)
if data:
print(data.decode("utf-8"))
item.send(
b"HTTP/1.1 200 ok
Content-Type: text/html;charset=utf-8
<h1>Web Server Test</h1>"
)
else:
item.close()
socket_list.remove(item)
info = socket_info_dict[item]
print(f"[{info}已断开,当前连接数:{len(socket_list)-1}]")
if __name__ == "__main__":
main()
输出和上面一样
扩展说明:
select 函数监视的文件描述符分3类,分别是
writefds
、readfds
、和exceptfds
。调用后select函数会阻塞,直到有描述符就绪函数返回(有数据可读、可写、或者有except)或者超时(timeout指定等待时间,如果立即返回设为null即可)
select的一个缺点在于单个进程能够监视的文件描述符的数量存在最大限制,在Linux上一般为1024(64位=>2048)
然后Poll就出现了,就是把上限给去掉了,本质并没变,还是使用的轮询
2.EPoll
epoll在内核2.6中提出(Linux独有),使用一个文件描述符管理多个描述符,将用户关心的文件描述符的事件存放到内核的一个事件表中,采用监听回调的机制,这样在用户空间和内核空间的copy只需一次,避免再次遍历就绪的文件描述符列表
先来看个案例吧:(输出和上面一样)
import socket
import select
def main():
with socket.socket() as tcp_server:
tcp_server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
tcp_server.bind((‘‘, 8080))
tcp_server.listen()
# epoll是linux独有的
epoll = select.epoll()
# tcp_server注册到epoll中
epoll.register(tcp_server.fileno(), select.EPOLLIN | select.EPOLLET)
# key-value
fd_socket_dict = dict()
# 回调需要自己处理
while True:
# 返回可读写的socket fd 集合
poll_list = epoll.poll()
for fd, event in poll_list:
# 服务器的socket
if fd == tcp_server.fileno():
client_socket, client_addr = tcp_server.accept()
fd = client_socket.fileno()
fd_socket_dict[fd] = (client_socket, client_addr)
# 把客户端注册进epoll中
epoll.register(fd, select.EPOLLIN | select.EPOLLET)
else: # 客户端
client_socket, client_addr = fd_socket_dict[fd]
data = client_socket.recv(2048)
print(
f"[来自{client_addr}的消息,当前连接数:{len(fd_socket_dict)}]
")
if data:
print(data.decode("utf-8"))
client_socket.send(
b"HTTP/1.1 200 ok
Content-Type: text/html;charset=utf-8
<h1>Web Server Test</h1>"
)
else:
del fd_socket_dict[fd]
print(
f"[{client_addr}已离线,当前连接数:{len(fd_socket_dict)}]
"
)
# 从epoll中注销
epoll.unregister(fd)
client_socket.close()
if __name__ == "__main__":
main()
扩展:epoll的两种工作模式
LT(level trigger,水平触发)模式:当epoll_wait检测到描述符就绪,将此事件通知应用程序,应用程序可以不立即处理该事件。下次调用epoll_wait时,会再次响应应用程序并通知此事件。LT模式是默认的工作模式。
LT模式同时支持阻塞和非阻塞socket。
ET(edge trigger,边缘触发)模式:当epoll_wait检测到描述符就绪,将此事件通知应用程序,应用程序必须立即处理该事件。如果不处理,下次调用epoll_wait时,不会再次响应应用程序并通知此事件。
ET是高速工作方式,只支持非阻塞socket(ET模式减少了epoll事件被重复触发的次数,因此效率要比LT模式高)
Code提炼一下:
- 实例化对象:
epoll = select.epoll()
- 注册对象:
epoll.register(tcp_server.fileno(), select.EPOLLIN | select.EPOLLET)
- 注销对象:
epoll.unregister(fd)
PS:epoll
不一定比Select
性能高,一般都是分场景的:
- 高并发下,连接活跃度不高时:epoll比Select性能高(eg:web请求,页面随时关闭)
- 并发不高,连接活跃度比较高:Select更合适(eg:小游戏)
- Select是win和linux通用的,而epoll只有linux有
其实IO多路复用还有一个kqueue
,和epoll
类似,下面的通用写法中有包含
3.通用写法(Selector
)
一般来说:Linux下使用epoll,Win下使用select(IO多路复用会这个通用的即可)
先看看Python源代码:
# 选择级别:epoll|kqueue|devpoll > poll > select
if ‘KqueueSelector‘ in globals():
DefaultSelector = KqueueSelector
elif ‘EpollSelector‘ in globals():
DefaultSelector = EpollSelector
elif ‘DevpollSelector‘ in globals():
DefaultSelector = DevpollSelector
elif ‘PollSelector‘ in globals():
DefaultSelector = PollSelector
else:
DefaultSelector = SelectSelector
实战案例:(可读和可写可以不分开)
import socket
import selectors
# Linux下使用epoll,Win下使用select
Selector = selectors.DefaultSelector()
class Task(object):
def __init__(self):
# 存放客户端fd和socket键值对
self.fd_socket_dict = dict()
def run(self):
self.server = socket.socket()
self.server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
self.server.bind((‘‘, 8080))
self.server.listen()
# 把Server注册到epoll
Selector.register(self.server.fileno(), selectors.EVENT_READ,
self.connected)
def connected(self, key):
"""客户端连接时处理"""
client_socket, client_address = self.server.accept()
fd = client_socket.fileno()
self.fd_socket_dict[fd] = (client_socket, client_address)
# 注册一个客户端读的事件(服务端去读消息)
Selector.register(fd, selectors.EVENT_READ, self.call_back_reads)
print(f"{client_address}已连接,当前连接数:{len(self.fd_socket_dict)}")
def call_back_reads(self, key):
"""客户端可读时处理"""
# 一个fd只能注册一次,监测可写的时候需要把可读给注销
Selector.unregister(key.fd)
client_socket, client_address = self.fd_socket_dict[key.fd]
print(f"[来自{client_address}的消息:]
")
data = client_socket.recv(2048)
if data:
print(data.decode("utf-8"))
# 注册一个客户端写的事件(服务端去发消息)
Selector.register(key.fd, selectors.EVENT_WRITE,
self.call_back_writes)
else:
client_socket.close()
del self.fd_socket_dict[key.fd]
print(f"{client_address}已断开,当前连接数:{len(self.fd_socket_dict)}")
def call_back_writes(self, key):
"""客户端可写时处理"""
Selector.unregister(key.fd)
client_socket, client_address = self.fd_socket_dict[key.fd]
client_socket.send(b"ok")
Selector.register(key.fd, selectors.EVENT_READ, self.call_back_reads)
def main():
t = Task()
t.run()
while True:
ready = Selector.select()
for key, obj in ready:
# 需要自己回调
call_back = key.data
call_back(key)
if __name__ == "__main__":
main()
Code提炼一下:
- 实例化对象:
Selector = selectors.DefaultSelector()
- 注册对象:
Selector.register(server.fileno(), selectors.EVENT_READ, call_back)
Selector.register(server.fileno(), selectors.EVENT_WRITE, call_back)
- 注销对象:
Selector.unregister(key.fd)
- 注意一下:一个fd只能注册一次,监测可写的时候需要把可读给注销(反之一样)
业余拓展:
select, iocp, epoll,kqueue及各种I/O复用机制
https://blog.csdn.net/shallwake/article/details/5265287
kqueue用法简介
http://www.cnblogs.com/luminocean/p/5631336.html
以上是关于io多路复用的主要内容,如果未能解决你的问题,请参考以下文章