通过select监控多个描述符实现并发连接

Posted 昀溪

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了通过select监控多个描述符实现并发连接相关的知识,希望对你有一定的参考价值。

概述

本文通过使用select改写之前的服务器程序通过监控多个套接字描述符来实现并发连接并加入了一些机制让程序更加健壮,不过我们所有的实验都是建立在单词发送数据不会超过1024字节,如果超过你需要做特殊处理。

代码实例

描述符就绪条件

套接字准备好读

以下条件满足之一则套接字准备好读

  • 套接字接收缓冲区中的数据长度大于0
  • 该连接读半部关闭,也就是本端的套接字收到FIN,也就是对方已经发送完数据并执行了四次断开的第一次发送FIN,这时候本端如果继续尝试读取将会得到一个EOF也就是得到空。
  • 套接字是一个监听套接字且已经完成的连接数量大于0,也就是如果监听套接字可读正面有新连接进来那么在连接套接字上条用accept将不会阻塞
  • 套接字产生错误需要进行处理,读取这样的套接字将返回一个错误

套接字准备好写

以下条件满足之一则套接字准备好写

  • 套接字发送缓冲区可以空间大于等于套接字发送缓冲区最低水位,也就是发送缓冲区没有空余空间或者空余空间不足以容纳一个TCP分组(1460-40=1420)。如果不够它就会等。当可以容纳了就表示套接字可写,这个可写是程序把数据发送到套接字发送缓冲区。
  • 该连接写半部关闭,
  • 使用非阻塞式connect的套接字已建立连接或者connect已经失败
  • 有一个错误套接字待处理

服务器端代码

 1 #!/usr/bin/env python
 2 # -*- coding: utf-8 -*-
 3 # Author: rex.cheny
 4 # E-mail: [email protected]
 5 
 6 import socket
 7 import select
 8 
 9 
10 def echoStr(readAbledSockFD, rList):
11     try:
12         bytesData = readAbledSockFD.recv(1024)
13         data = bytesData.decode(encoding="utf-8")
14         if data:
15             print("收到客户端 ", readAbledSockFD.getpeername(), " 消息:", data)
16             if data.upper() == "BYE":
17                 print("客户端 ", readAbledSockFD.getpeername(), " 主动断开连接。")
18                 rList.remove(readAbledSockFD)
19                 readAbledSockFD.close()
20             else:
21                 readAbledSockFD.send(data.encode(encoding="utf-8"))
22         else:
23             """
24             如果客户端进程意外终止,那么select将返回,因为该连接套接字收到FIN,所以readAbledSockFD读取的内容是‘‘就是空,数据长度是0
25             也就是你试图读取一个收到FIN的套接字会出现这种情况,通常的错误信息是 "server terminated prematurely"
26             """
27             print("客户端 ", readAbledSockFD.getpeername(), " 意外中断连接。")
28             rList.remove(readAbledSockFD)
29             readAbledSockFD.close()
30     except Exception as err:
31         """
32         这里如果抛出异常通常是因为当连接套接字收到RST之后调用 recv()函数产生的 "Connection reset by peer" 错误,
33         为什么套接字会收到RST,通常是向一个收到FIN的套接字执行写入操作导致的。
34         """
35         print("客户端 ", readAbledSockFD.getpeername(), " 意外中断连接。")
36         rList.remove(readAbledSockFD)
37         readAbledSockFD.close()
38 
39 
40 def main():
41     sockFd = socket.socket()
42     sockFd.bind(("", 5556))
43     sockFd.listen(5)
44 
45     # 这里为什么要把这个监听套接字放入可读列表中呢?服务器监听套接字描述符如果有新连接进来那么该描述符可读
46     rList = [sockFd]
47     wList = []
48     eList = []
49 
50     print("等待客户端连接......")
51     while True:
52         """
53         select(),有4个参数,前三个必须也就是感兴趣的描述符,第四个是超时时间
54         第一个参数:可读描述符列表
55         第二个参数:可写描述符列表
56         第三个参数:错误信息描述符列表
57         对于自己的套接字来说,输入表示可以读取,输出表示可以写入,套接字就相当于一个管道,对方的写入代表你的读取,你的写入代表对方的读取
58         
59         select函数返回什么呢?你把感兴趣的描述符加入到列表中并交给select后,当有可读或者有可写或者错误这些描述符就绪后,select就会返回
60         哪些就绪的描述符,你需要做的就是遍历这些描述符逐一进行处理。
61         """
62         readSet, writeSet, errorSet = select.select(rList, wList, eList)
63 
64         # 处理描述符可读
65         for readAbledSockFD in readSet:
66             if readAbledSockFD is sockFd:
67                 try:
68                     connFd, remAddr = sockFd.accept()
69                 except Exception as err:
70                     """
71                     这里处理当三次握手完成后,客户端意外发送了一个RST,这将导致一个服务器错误
72                     """
73                     print("")
74                     continue
75                 print("新连接:", connFd.getpeername())
76                 # 把新连接加入可读列表中
77                 rList.append(connFd)
78             else:
79                 echoStr(readAbledSockFD, rList)
80 
81         # 处理描述符可写
82         for writeAbledSockFd in writeSet:
83             pass
84 
85         # 处理错误描述符
86         for errAbled in errorSet:
87             pass
88 
89 
90 if __name__ == __main__:
91     main()

客户端代码

 1 #!/usr/bin/env python
 2 # -*- coding: utf-8 -*-
 3 # Author: rex.cheny
 4 # E-mail: [email protected]
 5 
 6 import socket
 7 import select
 8 import sys
 9 
10 
11 def echoStr(sockFd, connectionFailed):
12     try:
13         bytesData = sockFd.recv(1024)
14         data = bytesData.decode(encoding="utf-8")
15         if data:
16             print("服务器回复:", data)
17         else:
18             """
19             如果服务器进程意外终止,那么套接字也将返回,因为该连接套接字收到FIN,所以sockFd读取的内容是‘‘就是空,数据长度是0
20             也就是你试图读取一个收到FIN的套接字会出现这种情况,通常的错误信息是 "server terminated prematurely"
21             """
22             print("服务器 ", sockFd.getpeername(), " 意外中断连接。")
23             sockFd.close()
24             connectionFailed = True
25     except Exception as err:
26         """
27         这里如果抛出异常通常是因为当连接套接字收到RST之后调用 recv()函数产生的 "Connection reset by peer" 错误,
28         为什么套接字会收到RST,通常是向一个收到FIN的套接字执行写入操作导致的。
29         """
30         print("服务器 ", sockFd.getpeername(), " 意外中断连接。")
31         sockFd.close()
32         connectionFailed = True
33     return connectionFailed
34 
35 
36 def main():
37     sockFd = socket.socket()
38     sockFd.connect(("127.0.0.1", 5556))
39 
40     # 用于判断服务器是否意外中断
41     connectionFailed = False
42     while True:
43         data = input("等待输入:")
44         if data == "Bye":
45             sockFd.send("Bye".encode(encoding="utf-8"))
46             """
47             shutdown就是主动触发关闭套接字,发送FIN,后面的参数是关闭写这一半,其实就是告诉服务器客户端不会再发送数据了。
48             """
49             sockFd.shutdown(socket.SHUT_WR)
50             break
51         else:
52             sockFd.send(data.encode(encoding="utf-8"))
53             if echoStr(sockFd, connectionFailed):
54                 break
55 
56 
57 if __name__ == __main__:
58     main()

 

以上是关于通过select监控多个描述符实现并发连接的主要内容,如果未能解决你的问题,请参考以下文章

用select实现IO的复用

select多路复用实现多客户端连接服务器

7. I/O复用

Select

IO多路复用简介

dubbo-客户端请求连接并发数量监控