IO多路复用
Posted 宁信
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了IO多路复用相关的知识,希望对你有一定的参考价值。
1.事件驱动模型
上一篇写的协程仅仅是切换,本身不能实现并发,什么时候切换也不知道
那么什么时候切回去呢?怎么确定IO操作完了?通过回调函数
对于事件驱动型程序模型,它的流程大致如下: 开始--->初始化--->等待
事件驱动程序在启动之后,就在那等待,等待什么呢?等待被事件触发。传统编程下也有“等待”的时候,比如在代码块D中,你定义了一个input(),需要用户输入数据。但这与下面的等待不同,传统编程的“等待”,比如input(),你作为程序编写者是知道或者强制用户输入某个东西的,或许是数字,或许是文件名称,如果用户输入错误,你还需要提醒他,并请他重新输入。事件驱动程序的等待则是完全不知道,也不强制用户输入或者干什么。只要某一事件发生,那程序就会做出相应的“反应”。这些事件包括:输入信息、鼠标、敲击键盘上某个键还有系统内部定时器触发。
比如socketserver,多个客户端连接,单线程下实现并发效果,就叫多路复用。
文件描述符在形式上是一个非负整数。实际上,它是一个索引值,指向内核为每一个进程所维护的该进程打开文件的记录表。当程序打开一个现有文件或者创建一个新文件时,内核向进程返回一个文件描述符。在程序设计中,一些涉及底层的程序编写往往会围绕着文件描述符展开。但是文件描述符这一概念往往只适用于UNIX、Linux这样的操作系统。
# 在linux的IO多路复用中有水平触发,边缘触发两种模式,这两种模式的区别如下: # # 水平触发:如果文件描述符已经就绪可以非阻塞的执行IO操作了,此时会触发通知.允许在任意时刻重复检测IO的状态, # 没有必要每次描述符就绪后尽可能多的执行IO.select,poll就属于水平触发. # # 边缘触发:如果文件描述符自上次状态改变后有新的IO活动到来,此时会触发通知.在收到一个IO事件通知后要尽可能 # 多的执行IO操作,因为如果在一次通知中没有执行完IO那么就需要等到下一次新的IO活动到来才能获取到就绪的描述 # 符.信号驱动式IO就属于边缘触发.
select调用是内核级别的,select轮询相对非阻塞的轮询的区别在于—前者可以等待多个socket,能实现同时对多个IO端口进行监听,当其中任何一个socket的数据准好了,就能返回进行可读,然后进程再进行recvfrom系统调用,将数据由内核拷贝到用户进程,当然这个过程是阻塞的。
server端
#__author: greg #date: 2017/9/24 21:52 import socket sk1=socket.socket() sk1.bind((‘127.0.0.1‘,8001)) sk1.listen() sk2=socket.socket() sk2.bind((‘127.0.0.1‘,8002)) sk2.listen() sk3=socket.socket() sk3.bind((‘127.0.0.1‘,8003)) sk3.listen() # inputs=[sk1,sk2,sk3,] inputs=[sk1,] import select while True: # [sk1, sk2,sk3,],select内部自动监听sk1,sk2,sk3三个对象,一旦某个句柄发生变化 # 如果有人链接sk1 # r_list=[sk1] #如果有人第一次连接,,sk1发生变化 #句柄列表11, 句柄列表22, 句柄列表33 = select.select(句柄序列1, 句柄序列2, 句柄序列3, 超时时间) r_list,w_list,e_list=select.select(inputs,[],[],1) print(r_list) for sk in r_list: #conn每一个连接对象 conn,address=sk.accept() conn.sendall(bytes(‘hello‘,encoding=‘utf8‘)) conn.close() # while True: # conn, address = sk.accept() # while True: # content_bytes=conn.recv(1024) # content_str=str(content_bytes,encoding=‘utf8‘) # conn.sendall(bytes(content_bytes+‘好‘,encoding=‘utf8‘))#sendall就是用while循环调用send # conn.close()
client端
#__author: greg #date: 2017/9/24 22:30 import socket obj=socket.socket() obj.connect((‘127.0.0.1‘,8001)) content=str(obj.recv(1024),encoding=‘utf8‘) print(content) obj.close()
实例,并发聊天
sk=socket.socket() sk.bind(("127.0.0.1",8801)) sk.listen(5) inputs=[sk,] while True: r,w,e=select.select(inputs,[],[],5) print(len(r)) for obj in r: if obj==sk: conn,add=obj.accept() print(conn) #[<socket.socket fd=420, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=(‘127.0.0.1‘, 8801), raddr=(‘127.0.0.1‘, 55504)>] inputs.append(conn) else: data_byte=obj.recv(1024) print(str(data_byte,‘utf8‘)) inp=input(‘回答%s号客户>>>‘%inputs.index(obj)) obj.sendall(bytes(inp,‘utf8‘)) print(‘>>‘,r)
文件描述符其实就是咱们平时说的句柄,只不过文件描述符是linux中的概念。注意,我们的accept或recv调用时即向系统发出recvfrom请求
(1) 如果内核缓冲区没有数据--->等待--->数据到了内核缓冲区,转到用户进程缓冲区;
(2) 如果先用select监听到某个文件描述符对应的内核缓冲区有了数据,当我们再调用accept或recv时,直接将数据转到用户缓冲区。
如何在某一个client端退出后,不影响server端和其它客户端正常交流
try: data_byte=obj.recv(1024) print(str(data_byte,‘utf8‘)) inp=input(‘回答%s号客户>>>‘%inputs.index(obj)) obj.sendall(bytes(inp,‘utf8‘)) except Exception: inputs.remove(obj)
select.seclect的四个参数
select(rlist, wlist, xlist[, timeout]) -> (rlist, wlist, xlist) Wait until one or more file descriptors are ready for some kind of I/O. The first three arguments are sequences of file descriptors to be waited for: rlist -- wait until ready for reading wlist -- wait until ready for writing xlist -- wait for an ``exceptional condition‘‘ If only one kind of condition is required, pass [] for the other lists. A file descriptor is either a socket or file object, or a small integer gotten from a fileno() method call on one of those. The optional 4th argument specifies a timeout in seconds; it may be a floating point number to specify fractions of seconds. If it is absent or None, the call will never time out. The return value is a tuple of three lists corresponding to the first three arguments; each contains the subset of the corresponding file descriptors that are ready.
等待一个或多个文件描述符准备好进行某种I / O操作。前三个参数是等待文件描述符的序列:
rlist - 等待直到准备好阅读
wlist - 等待,直到准备好写作
xlist - 等待一个“例外条件”
如果只需要一种条件,则为其他列表传递[]。 文件描述符可以是套接字或文件对象,也可以是其中一个fileno()方法调用获得的小整数。可选的第四个参数指定以秒为单位的超时时间; 它可能是一个浮点数来指定几分之一秒。 如果不存在或无,则调用永远不会超时。返回值是与前三个参数对应的三个列表的元组; 每个都包含准备好的相应文件描述符的子集。
server端
#__author: greg #date: 2017/9/24 22:58 import socket import select sk1=socket.socket() sk1.bind((‘127.0.0.1‘,8001)) sk1.listen() # inputs=[sk1,sk2,sk3,] inputs=[sk1,] outputs=[] message_dict={} while True: # [sk1, sk2,sk3,],select内部自动监听sk1,sk2,sk3三个对象,一旦某个句柄发生变化 # 如果有人链接sk1 # r_list=[sk1] #如果有人第一次连接,,sk1发生变化 # select内部自动监听socket对象,一旦socket变换感知到 r_list,w_list,e_list=select.select(inputs,outputs,inputs,1) print(‘正在监听的socket对象%d‘%len(inputs)) print(r_list) for sk_or_conn in r_list: #conn每一个连接对象 if sk_or_conn==sk1: #表示有新用户来连接 conn,address=sk_or_conn.accept() inputs.append(conn) message_dict[conn]=[] else: #有老用户发消息 try: data_bytes=sk_or_conn.recv(1024) # data_str=str(data_bytes,encoding=‘utf8‘) # sk_or_conn.sendall(bytes(data_str+‘好‘,encoding=‘utf8‘)) except Exception as e: inputs.remove(sk_or_conn) else: #用户正常发送消息 data_str=str(data_bytes,encoding=‘utf8‘) print(data_str) # sk_or_conn.sendall(bytes(data_str+‘好‘,encoding=‘utf8‘)) message_dict[sk_or_conn].append(data_str) outputs.append(sk_or_conn) #w_list仅仅保存了谁给我发过消息 for conn in w_list: recv_str=message_dict[conn][0] del message_dict[conn][0] conn.sendall(bytes(recv_str+‘好‘,encoding=‘utf8‘)) outputs.remove(conn) for sk in e_list: inputs.remove(sk)
客户端
#__author: greg #date: 2017/9/24 22:20 import socket obj=socket.socket() obj.connect((‘127.0.0.1‘,8001)) # content=str(obj.recv(1024),encoding=‘utf8‘) # print(content) while True: inp=input(‘>>>‘) obj.sendall(bytes(inp,encoding=‘utf8‘)) ret=str(obj.recv(1024),encoding=‘utf8‘) print(ret) obj.close()
以上是关于IO多路复用的主要内容,如果未能解决你的问题,请参考以下文章