sock以及select原理代码测试
Posted 少数派&蔡先生
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了sock以及select原理代码测试相关的知识,希望对你有一定的参考价值。
#!/usr/bin/python # -*- coding: UTF-8 -*- # IO多路复用 可以监听多个 文件描述符(socket对象)文件句柄 一旦变化即可感知 import socket sk1 = socket.socket() sk1.bind(("127.0.0.1",8090))# 元组 sk1.listen() # sk1 = socket.socket() # sk1.bind(("127.0.0.1",8083))# 元组 # sk1.listen() # # sk2 = socket.socket() # sk2.bind(("127.0.0.1",8084))# 元组 # sk2.listen() #inputs = [sk,sk1,sk2,] inputs = [sk1,] outputs =[] mess = {} import select while True: #返回三个参数 select 内部监听三个参数对象 一旦变化就监听 # 如果连接sk r_li = [sk] 一秒自行一次 # w_li 等同于【】 不发生改变 # e_li 等同于错误(谁发生错误返回谁) # 监听最多1024个 后来poll无限制了 内部底层for循环 # epoll 升级 不在是for循环 r_list,w_list,e_list = select.select(inputs,outputs,inputs,1) print("正在监听的socket对象%d" % len(inputs)) print(r_list) for sk in r_list: # 连接对象 if sk == sk1: conn,address = sk.accept() inputs.append(conn) mess[conn] = [] else: # 老用户发消息了 try: data_bytes = sk.recv(1024) data_str = str(data_bytes,encoding="utf-8") mess[sk].append(data_str) # sk.sendall(bytes(data_str + "好", encoding="utf-8")) except Exception as ex: print(ex) inputs.remove(sk) else: outputs.append(sk) # w_list 仅仅保存谁给我发了消息 for conn in w_list: rec = mess[conn][0] del mess[conn][0] conn.sendall(bytes(rec+"好嘞",encoding="utf-8")) outputs.remove(conn) # 移除错误的连接 # for s in e_li: # inputs.remove(s) # #print(r_li) # conn,address = sk.accept() # content = conn.recv(1024) # content_str = str(content,encoding="utf-8") # print(content_str)
#!/usr/bin/python # -*- coding: UTF-8 -*- import socket obj = socket.socket() obj.connect(("127.0.0.1",8090)) while True: inp = input(">>>") obj.sendall(bytes(inp,encoding="utf-8")) by = obj.recv(1024) st = str(by,encoding="utf-8") print(st) obj.close()
以上是关于sock以及select原理代码测试的主要内容,如果未能解决你的问题,请参考以下文章