Python-IO多路复用
Posted Sch01aR#
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python-IO多路复用相关的知识,希望对你有一定的参考价值。
select实现socket server多并发服务器端
# -*- coding:utf-8 -*- __author__ = "MuT6 Sch01aR" import socket import select import queue server = socket.socket() server.bind((\'127.0.0.1\', 9999)) server.listen() server.setblocking(False) # 要设置为非阻塞 input_list = [server, ] # 本身也要检测 output_list = [] msg_dic = {} while True: stdinput, stdoutput, stderr = select.select(input_list, output_list, input_list) # 第一个input_list参数为要用的连接,ouput_list为可能返回的连接,第二个input_list为可能报错的连接 # stdinput为连接的地址, stdoutput为返回的连接, stderr为错误的连接 print(stdinput, stdoutput, stderr) try: for r in stdinput: if r is server: # 来了个新连接 conn, addr = server.accept() print(\'当前连接客户端:\', addr) input_list.append(conn) msg_dic[conn] = queue.Queue() # 初始化一个队列,用来储存要返回给客户端的数据 else: data = r.recv(1024) print(\'收到数据\', data) msg_dic[r].put(data) output_list.append(r) # 放入返回的连接队列里 except socket.error: print(\'客户端断开连接\') break for w in stdoutput: # 要返回给客户端的连接列表 data_to_client = msg_dic[w].get() w.send(data_to_client) # 把数据发送给客户端 output_list.remove(w) # 确保下次循环的时候stdoutput不返回已经处理完的连接 for e in stderr: if e in output_list: output_list.remove(e) input_list.remove(e) server.close() del msg_dic[e]
客户端
# -*- coding:utf-8 -*- __author__ = "MuT6 Sch01aR" import socket client = socket.socket() client.connect((\'127.0.0.1\', 9999)) while True: msg = input(\'>>>:\').strip() if len(msg) ==0:continue client.send(msg.encode(\'utf-8\')) data = client.recv(1024) print(data)
selector模块
selector模块可以使用select和epoll,它会根据所处的平台来选出最适合的I/O多路复用机制,在windows下为select,在linux下为epoll
通过selector模块实现单线程上万并发的socket server
服务器端
# -*- coding:utf-8 -*- __author__ = "MuT6 Sch01aR" import selectors import socket sel = selectors.DefaultSelector() def accept(sock, mask): conn, addr = sock.accept() print(\'当前连接客户端\', addr) conn.setblocking(False) # 把连接设置为非阻塞 sel.register(conn, selectors.EVENT_READ, read) # 新连接注册read回调函数,如果新的连接发送数据就执行read函数 def read(conn, mask): data = conn.recv(1024) if data: print(\'收到数据:\',data) conn.send(data) else: print(\'客户端关闭\', conn) sel.unregister(conn) # 注销注册的事件 conn.close() server = socket.socket() server.bind((\'localhost\', 9999)) server.listen() server.setblocking(False) sel.register(server, selectors.EVENT_READ, accept) # 注册一个事件,如果来了连接,就调用accept函数 # EVENT_READ,表示可读,值mask为1,EVENT_WRITE表示可写,值mask为2 while True: events = sel.select() # 调用epoll或select,默认阻塞,有活动的连接就返回活动的连接列表 for key, mask in events: callback = key.data # 回调函数,即accept函数 callback(key.fileobj, mask) # key.fileobj为文件句柄,即还没建立连接的socket实例 sel.close() # 最后要关闭,确保所有的资源被释放
客户端
# -*- coding:utf-8 -*- __author__ = "MuT6 Sch01aR" import socket import sys msg = [ b\'python\', b\'php\', b\'java\', ] server_address = (\'127.0.0.1\', 9999) socks = [socket.socket(socket.AF_INET, socket.SOCK_STREAM) for i in range(300)] print(\'connecting to %s port %s\' % server_address) for s in socks: s.connect(server_address) for m in msg: for s in socks: print(\'%s: sending "%s"\' % (s.getsockname(), m)) s.send(m) for s in socks: data = s.recv(1024) print(\'%s: received "%s"\' % (s.getsockname(), data)) if not data: print(sys.stderr, \'closing socket\', s.getsockname())
服务器端运行结果
客户端运行结果
300个socket连接1秒左右就全结束了
以上是关于Python-IO多路复用的主要内容,如果未能解决你的问题,请参考以下文章
请不要再说NIO和多路复用IO是同一个东西了(内含BIONIO多路复用NettyAIO案例测试代码)
Kotlin 协程协程中的多路复用技术 ② ( select 函数原型 | SelectClauseN 事件 | 查看挂起函数是否支持 select )