Socket 多任务

Posted juno3550

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Socket 多任务相关的知识,希望对你有一定的参考价值。

1. 循环版实现多连接

2. threading.Thread 多线程(传输文件) 

3. SockerServer 实现多任务

3.1 ForkingMixIn - 多进程(限 linux)

3.2 ThreadingMixIn - 多线程

3.3 ThreadingTCPServer - 线程池

4. Select 模块(单线程实现多线程效果)

 

 

1. 循环版实现多连接

以下例子算狭义上实现多用户访问服务,但都是同步执行,也就是一个用户连接关闭,下个用户才可以开始执行向服务发送请求数据。

其实现的核心是服务器接收连接部分写在死循环内,可以一直保持接收新用户端发起的请求的状态。

服务器端

 1 import socket
 2 
 3 HOST = 127.0.0.1
 4 PORT = 50008
 5 s = socket.socket(socket.AF_INET,socket.SOCK_STREAM)
 6 s.bind((HOST, PORT))
 7 s.listen(5)
 8 
 9 while True:
10        print ("开始进入监听状态...")
11        conn, addr = s.accept()
12        print ("接收到连接:", addr)
13        while True:
14            try:
15                data = conn.recv(1024)
16                if not data:
17                    print("断开客户端连接")
18                    break
19                print ("收到客户端数据:", data.decode("utf-8"))
20                msg = "这是一个循环版多连接服务测试"
21                conn.sendall(msg.encode("utf-8"))
22            except socket.error:
23                break
24        conn.close()

客户端

 1 import socket
 2 
 3 HOST = 127.0.0.1
 4 PORT = 50008
 5 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 6 
 7 s.connect((HOST, PORT))  
 8 times = 3
 9 while times>0:
10     cmd = input("向服务器发送数据:")   
11     s.sendall(cmd.encode("utf-8")) 
12     data = s.recv(1024)
13     print ("接收到服务器端的数据:", data.decode("utf-8"))
14     times -= 1
15 s.close()  # 关闭连接

 

2. threading.Thread 多线程(传输文件) 

服务端定义一个传输内容得规则;客户端按照此内容进行传输;服务端按照此内容进行解析。

服务器端

 1 import socket, time, socketserver, struct, os, threading
 2 
 3 # 固定的server启动流程
 4 host = 127.0.0.1
 5 port = 12307
 6 # 定义socket类型
 7 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)  
 8 # 绑定需要监听的ip和端口号
 9 s.bind((host, port))  
10 s.listen(1)
11 
12 
13 # 定义一个线程的任务函数
14 def conn_thread(connection, address):  
15     while True:
16         try:
17             connection.settimeout(600)
18             # struct.calcsize--->计算12个字符和1个长整型的数据有多长
19             #12个字符对应的是客户端发来的文件名,1个长整型对应的是文件内容的长度大小
20             fileinfo_size = struct.calcsize(36sl)  # 12s表示12个字符,l表示一个长整型数
21             buf = connection.recv(fileinfo_size)  # 正好接收12个字符长度和一个整数的数据
22             # 如果不加这个if,第一个文件传输完成后会自动走到下一句并阻塞,需要拿到文件大小信息才可以继续执行
23             if buf:  
24                 filename, filesize = struct.unpack(36sl, buf)
25                 filename_f = filename.decode("utf-8").strip(0)  # C语言中‘‘是一个ASCII码为0的字符,在python中表示占一个位置的空字符
26                 print("****filename:",filename_f)
27                 # 拿到文件名之后,我们要拼接一个新的文件绝对路径,在服务器上保存下来这个问题件
28                 filenewname = os.path.join(e:\\, os.path.basename(filename_f))
29                 print(u文件名称: %s , 文件大小: %s % (filenewname, filesize))
30                 recvd_size = 0  # 收的文件内容有多大了
31                 file = open(filenewname,wb)
32                 print(u"开始传输文件内容")
33                 while not recvd_size == filesize:  # 如果收到的文件长度不等于文件真实长度,就一直循环收
34                     if filesize - recvd_size > 1024:  # 文件大小和已经收的大小相差大于1024字节
35                         rdata = connection.recv(1024)  # 每次收1024个字节的内容
36                         recvd_size += len(rdata)  # 收到的长度在recvd_size累加
37                     else:
38                         # 用实际的文件大小减去已经收的差值去收(收最后剩余的大小)
39                         rdata = connection.recv(filesize-recvd_size)
40                         recvd_size = filesize  # 把实际收到的长度相加,recvd_size == filesize
41                     file.write(rdata)  # 把文件的内容写进去
42                 file.close()
43                 print(receive done)
44                 # connection.close()
45         except socket.timeout:
46             connection.close()
47 
48 while True:
49     print(u"开始进入监听状态")
50     connection, address = s.accept()
51     print(Connected by , address)
52     # 起一个子线程去收文件
53     thread = threading.Thread(target=conn_thread, args=(connection, address))
54     thread.start()
55     thread.join()  # 阻塞,等都收完了才会关掉连接
56     # s.close()

客户端

 1 import socket, os, struct
 2 
 3 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 4 s.connect((127.0.0.1, 12307))
 5 while True:
 6     filepath = input(请输入要传输的文件绝对路径:)
 7     print(type(filepath))
 8     print(len(filepath.encode("utf-8")))
 9     if os.path.isfile(filepath):  # 判断文件在不在,在就传输。
10         #fileinfo_size = struct.calcsize(‘36sl‘)  # 定义打包规则(需与服务器端规则一致)
11         # 定义文件头信息,包含文件名和文件大小
12         fhead = struct.pack(36sl, filepath.encode("utf-8"), os.stat(filepath).st_size)
13         print(os.stat(filepath).st_size)
14         s.send(fhead)
15         print (u文件路径:, filepath)
16         # with open(filepath,‘rb‘) as fo: 这样发送文件有问题,发送完成后还会发一些东西过去
17         fo = open(filepath, rb)
18         while True:
19             filedata = fo.read(1024)
20             if not filedata:
21                 break
22             s.send(filedata)
23         fo.close()
24         print (u传输成功)
25         # s.close()

 

3. SockerServer 实现多任务

Socket 编程在模块创建时无法进行多进程的处理,当有大量请求时,请求就会阻塞在队列中,甚至发生请求丢弃,如果需要大量 socket 就需要许多的 socket 绑定端口,写很多重复性得代码。

SocketServer 简化了网络服务器的编写。在进行 socket 创建时,使用 SocketServer 会大大减少创建的步骤,并且 SocketServer 使用了 select,它有4个类:TCPServer、UDPServer、UnixStreamServer和UnixDatagramServer,这4个类是同步进行处理的,另外通过 ForkingMixIn 和 ThreadingMixIn 类来支持异步。

ForkingMixIn 和 ThreadingMixIn 两个混合类,它们都提供 Server 类中 process_request 方法的新实现,前者在处理每次用户连接的时候都会开启新的进程,而后者会开启新的线程。想要让Server类实现并发处理,只用利用多继承即可,或者直接使用已经混合好的类。

使用步骤:

  1. 创建一个请求处理的类,是 BaseRequestHandler 的子类并重写其 handle 方法;
  2. 实例化一个服务器类,传入服务器的地址和请求处理的程序类;
  3. 调用 handle_request(),一般是调用其他事件循环或者使用 select 或 serve_forever。

集成 ThreadingMixIn 类时需要处理异常关闭。daemon_threads 指示服务器是否要等待线程终止,要是线程互相独立,必须要设置为 True,默认是 False。

 

3.1 ForkingMixIn - 多进程(限 linux)

多个连接同时到达服务器端的时候,主进程都会生成一个子进程专门处理此连接,而主进程则依旧保持监听状态。

因主进程和子进程是同时进行的,所以不会阻塞新的连接。但由于创建进程所消耗的资源比较大,这种处理方式在有大量连接时会带来性能问题。

服务器端

 1 from socketserver import TCPServer, ForkingMixIn, StreamRequestHandler
 2 import time
 3 
 4 
 5 # 自定义服务器类
 6 class Server(ForkingMixIn, TCPServer):  
 7     pass
 8 
 9 # 处理请求的程序类
10 class MyHandler(StreamRequestHandler):
11 
12     # 重写父类的handle函数
13     def handle(self):  
14         addr = self.request.getpeername()  # 获得客户端的地址
15         print(接收到连接:, addr)  # 打印客户端地址
16         data = self.rfile.readline().strip()  # 客户端发送的信息必须带有回车,否则会一直等待客户端继续发送数据
17         print("从客户端接收到的请求:", data.decode("utf-8"))
18         time.sleep(1)
19         if data:
20             self.wfile.write(这是从服务端进程中发出的消息.encode("utf-8"))  # 给客户端发送信息
21 
22 
23 host = ""
24 port = 18001
25 # 实例化一个服务器类,传入服务器的地址和请求处理的程序类
26 server = Server((host, port), MyHandler)
27 print("开始监听状态...")
28 # 开始侦听并处理连接
29 server.serve_forever()

客户端

1 if __name__ == __main__:
2     import socket
3     sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
4     sock.connect((127.0.0.1, 18001))
5     import time
6     time.sleep(2)
7     sock.send(ls -al /home/wxh.encode("utf-8")+"
".encode("utf-8"))
8     print (sock.recv(1024).decode("utf-8"))
9     sock.close()

执行效果

[gr@gloryroad juno]$ python3 server.py
开始监听状态...
接收到连接: (127.0.0.1, 42444)
从客户端接收到的请求: ls -al /home/wxh
接收到连接: (127.0.0.1, 42446)
从客户端接收到的请求: ls -al /home/wxh

 

3.2 ThreadingMixIn - 多线程

线程是一种轻量级的进程,比 Fork 消耗的资源更少,而且主线程和子线程之间共享相同的内存空间,处理效率高。但大量的使用线程会带来线程之间的数据同步问题,处理不好可能使服务程序失去响应。

以下示例与 Fork 方式中代码基本相同,仅仅是采用的 ThreadingMixIn 类不同。

服务器端

 1 from socketserver import TCPServer, ThreadingMixIn, StreamRequestHandler
 2 import time
 3 
 4 
 5 # 自定义服务器类
 6 class Server(ThreadingMixIn, TCPServer):
 7     pass
 8 
 9 # 处理请求的程序类
10 class MyHandler(StreamRequestHandler):
11 
12     # 重写父类的handle函数
13     def handle(self):
14         addr = self.request.getpeername()  # 获得客户端的地址
15         print(接收到连接:, addr)  # 打印客户端地址
16         data = self.rfile.readline().strip()  # 客户端发送的信息必须带有回车,否则会一直等待客户端继续发送数据
17         print("从客户端接收到的请求:", data.decode("utf-8"))
18         time.sleep(1)
19         if data:
20             self.wfile.write(这是从服务端线程中发出的消息.encode("utf-8"))  # 给客户端发送信息
21 
22 
23 host = ‘‘
24 port = 18001
25 # 实例化一个服务器类,传入服务器的地址和请求处理的程序类
26 server = Server((host, port), MyHandler)
27 print("开始监听状态...")
28 # 开始侦听并处理连接
29 server.serve_forever()

客户端

1 if __name__ == __main__:
2     import socket
3     sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
4     sock.connect((127.0.0.1, 18001))
5     import time
6     time.sleep(2)
7     sock.send(ls -al /home/wxh.encode("utf-8")+"
".encode("utf-8"))
8     print (sock.recv(1024).decode("utf-8"))
9     sock.close()

 

3.3 ThreadingTCPServer - 线程池

技术图片

 服务器端

 1 import socketserver
 2 import threading
 3 
 4 # 自定义任务线程类
 5 class MyTCPHandler(socketserver.BaseRequestHandler):
 6     # 重写 handle 方法
 7     def handle(self):
 8         while True:
 9             print("接收到连接: ", self.client_address)
10             self.data = self.request.recv(1024).strip()
11             cur_thread = threading.current_thread()
12             print("当前线程: ", cur_thread)
13             if not self.data:
14                 print("客户端[%s]退出!" % self.client_address[0])
15                 break
16             print("客户端[%s]请求数据: %s" % (self.client_address[0], self.data.decode("utf-8")))
17             self.request.sendall(self.data.upper())
18 
19 if __name__ == "__main__":
20     HOST, PORT = "", 18001
21     server = socketserver.ThreadingTCPServer((HOST, PORT), MyTCPHandler)
22     print("开始监听状态...")
23     server.serve_forever()

客户端

1 if __name__ == __main__:
2     import socket
3     sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
4     sock.connect((127.0.0.1, 18001))
5     import time
6     time.sleep(2)
7     sock.send(ls -al /home/wxh.encode("utf-8")+"
".encode("utf-8"))
8     print (sock.recv(1024).decode("utf-8"))
9     sock.close()

执行效果

开始监听状态...
接收到连接:  (127.0.0.1, 17215)
当前线程:  <Thread(Thread-1, started 9620)>
客户端[127.0.0.1]请求数据: ls -al /home/wxh
接收到连接:  (127.0.0.1, 17215)
当前线程:  <Thread(Thread-1, started 9620)>
客户端[127.0.0.1]退出!
接收到连接:  (127.0.0.1, 17216)
当前线程:  <Thread(Thread-2, started 9496)>
客户端[127.0.0.1]请求数据: ls -al /home/wxh
接收到连接:  (127.0.0.1, 17216)
当前线程:  <Thread(Thread-2, started 9496)>
客户端[127.0.0.1]退出!

 

4. Select 模块(单线程实现多线程效果)

在 python 中,select 函数是一个对底层操作系统的直接访问的接口。它用来监控 sockets、files 和 pipes,等待IO完成(Waiting for I/O completion)。当有可读、可写或是异常事件产生时,select 可以很容易的监控到。

Select 模块在 windows、Unix 和 Linux 下均可使用,但在 windows 下,select 只能用于处理 socket。

 

Select 模块支持 C 中常用的 I/O 复用:

  • I/O 复用:监听多个描述符的状态,如果描述符状态改变,则会被内核修改标志位,从而被进程获取进而进行读写操作。
  • 文件描述符:内核(kernel)利用文件描述符(file descriptor)来访问文件。文件描述符是非负整数。打开现存文件或新建文件时,内核会返回一个文件描述符。

I/O 复用是在单线程模式下实现多线程的效果,即实现一个多 I/O 并发的效果。

进程指定内核监听哪些文件描述符(最多监听1024个fd)的哪些事件,当没有文件描述符事件发生时,进程被阻塞;当一个或者多个文件描述符事件发生时,进程被唤醒。

 

当调用 select() 时:

  1. 上下文切换转换为内核态(当一个任务(进程)执行内核代码时,称进程处于内核态,此时处理器处于特权级最高的(0级)内核代码中执行);
  2. 将fd从用户空间复制到内核空间;
  3. 内核遍历所有 fd(文件描述符),查看其对应事件是否发生;
  4. 如果没发生,将进程阻塞,当设备驱动产生中断或者timeout时间后,将进程唤醒,再次进行遍历;
  5. 返回遍历后的 fd;
  6. fd从内核空间复制到用户空间。

技术图片

 

select.select(rlist, wlist, xlist[, timeout]) 参数含义:

  1. rlist:输入而观察的文件对象列表
  2. wlist:输出而观察的文件对象列表
  3. xlist:观察错误异常的文件列表
  4. timeout:可选参数,表示超时秒数,如果为 None 或者为空则阻塞直到事件发生。

其返回1个 tuple,分别是3个准备好的对象列表,它和前边的参数是一样的顺序。

 

服务器端代码示例

 1 import socket
 2 import select
 3 
 4 s = socket.socket()
 5 s.bind(("127.0.0.1", 8888))
 6 s.listen(5)
 7 
 8 # 要监控的对象列表
 9 r_list = [s, ] 
10 num = 0
11 
12 # server的死循环
13 while True:
14     print("监听中...")
15 
16     # 第1个参数 r_list:可读的对象
17     # 第2个参数:可写的对象(基本不用)
18     # 第3个参数:出现异常的对象(基本不用)
19     # 这三个参数内容都是被操作系统监控的:
20     #   1)当有事件发生时,立马往下执行代码;否则阻塞监控10秒
21     #   2)若监控10秒了仍无事件发生,才往下执行
22     rl, wl, error = select.select(r_list, [], [], 10)
23     # rl 列表一开始为空,只有当s发生事件了(收到连接请求),才会将s加到rl中
24 
25     num += 1
26     print("执行次数:%s" % num)
27     print("rl‘s length is: %s" % len(rl))
28     print("r_list‘s length: %s" % len(r_list))
29     print("r1中的对象:", [i for i in rl])
30 
31     # 只有发生两种事件(有新连接或收到数据)时,rl列表中才会有对象元素,for循环才会往下执行
32     for fd in rl:
33         # 如果发生事件的对象是服务器端对象(s),则代表有新客户端连接
34         if fd == s:
35             conn, addr = fd.accept()  # 建立与客户端的连接
36             r_list.append(conn)  # 将连接对象放到监听列表r_list中
37             # 只有当客户端断开连接(close)了,conn才会从r_list中剔除
38             msg = conn.recv(200)  # 接收客户端的数据
39             # 把收到的数据变大写返回给客户端
40             conn.sendall(("first received data: %s" % msg.upper()).encode("utf-8"))
41             print("给客户端返回数据:", ("first responsed data: %s" % msg.upper().decode("utf-8")))
42             # s处理完后,则从rl中剔除了
43         # 如果发生事件的对象是连接对象(conn),则代表收到客户端请求数据
44         else:
45             try:
46                 msg = fd.recv(200)
47                 fd.sendall(("else received data: %s" % msg.upper()).encode("utf-8"))
48                 print("给客户端返回数据:", ("else responsed data: %s" % msg.upper().decode("utf-8")))
49             except (ConnectionAbortedError, ConnectionResetError):
50                 r_list.remove(fd)
51                 conn.close()
52             # conn处理完后,则从rl中剔除了
53 s.close()

 客户端代码示例

 1 import socket
 2 
 3 HOST = 127.0.0.1
 4 PORT = 8888
 5 s = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
 6 
 7 s.connect((HOST, PORT))  
 8 for i in range(4):
 9     cmd = input("向服务器发送数据:")   
10     s.sendall(cmd.encode("utf-8")) 
11     data = s.recv(1024)
12     print ("接收到服务器端的数据:", data.decode("utf-8"))
13 s.close()  # 关闭连接

执行效果

服务器端:

E:>python server.txt
监听中...
执行次数:1
rls length is: 0
r_lists length: 1
r1中的对象: []
监听中...
执行次数:2
rls length is: 0
r_lists length: 1
r1中的对象: []
监听中...
执行次数:3
rls length is: 0
r_lists length: 1
r1中的对象: []
监听中...
执行次数:4
rls length is: 0
r_lists length: 1
r1中的对象: []
监听中...
执行次数:5
rls length is: 0
r_lists length: 1
r1中的对象: []
监听中...
执行次数:6
rls length is: 1
r_lists length: 1
r1中的对象: [<socket.socket fd=504, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=(127.0.0.1, 8888)>]
给客户端返回数据: first responsed data: HELLO 1
监听中...
执行次数:7
rls length is: 0
r_lists length: 2
r1中的对象: []
监听中...
执行次数:8
rls length is: 1
r_lists length: 2
r1中的对象: [<socket.socket fd=544, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=(127.0.0.1, 8888), raddr=(127.0.0.1, 18401)>]
给客户端返回数据: else responsed data: HELLO 2
监听中...
执行次数:9
rls length is: 0
r_lists length: 2
r1中的对象: []
监听中...
执行次数:10
rls length is: 1
r_lists length: 2
r1中的对象: [<socket.socket fd=544, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=(127.0.0.1, 8888), raddr=(127.0.0.1, 18401)>]
给客户端返回数据: else responsed data: HELLO 3
监听中...
执行次数:11
rls length is: 1
r_lists length: 2
r1中的对象: [<socket.socket fd=544, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=(127.0.0.1, 8888), raddr=(127.0.0.1, 18401)>]
给客户端返回数据: else responsed data: HELLO 4
监听中...
执行次数:12
rls length is: 1
r_lists length: 2
r1中的对象: [<socket.socket fd=544, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=(127.0.0.1, 8888), raddr=(127.0.0.1, 18401)>]
给客户端返回数据: else responsed data:
监听中...
执行次数:13
rls length is: 1
r_lists length: 2
r1中的对象: [<socket.socket fd=544, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=(127.0.0.1, 8888), raddr=(127.0.0.1, 18401)>]
监听中...
执行次数:14
rls length is: 0
r_lists length: 1
r1中的对象: []
监听中...

客户端:

E:>python client.txt
向服务器发送数据:hello 1
接收到服务器端的数据: first received data: bHELLO 1
向服务器发送数据:hello 2
接收到服务器端的数据: else received data: bHELLO 2
向服务器发送数据:hello 3
接收到服务器端的数据: else received data: bHELLO 3
向服务器发送数据:hello 4
接收到服务器端的数据: else received data: bHELLO 4

 

 

 

以上是关于Socket 多任务的主要内容,如果未能解决你的问题,请参考以下文章

Socket 多任务

多任务-python实现-UDP多线程聊天(2.1.6)

多线程 Thread 线程同步 synchronized

多个用户访问同一段代码

多个请求是多线程吗

详解C++多线程