一个简单的例子,深入研究一下socket的多线程处理任务
Server端:
#!/usr/bin/env python #encoding:utf8 # # 注意:定义encoding时必须在第二行 import socket import Queue import threading from time import sleep host = "127.0.0.1" port = 60283 timeWait = 3 #定义每个线程处理任务时需要的时间,模拟处理任务 ThreadNum = 10 #定义创建的线程 cache = Queue.Queue(maxsize=1000) #定义一个队列 # 处理任务的类 class Server(threading.Thread): def __init__(self, cache, ThreadName): threading.Thread.__init__(self) self.name = ThreadName self.cache = cache def run(self): while True: if not cache.empty(): #判断队列是否为空 conn, addr = cache.get() data = conn.recv(1024) conn.sendall(‘success‘) print ‘cacheData: ‘ + data + ‘; ThreadName: ‘ + self.name + ‘; cacheSize: ‘ + str(self.cache.qsize()) sleep(timeWait) for i in range(ThreadNum): s = Server(cache, str(i)) s.setDaemon(True) #设置为守护模式,当主线程退出时,子线程立即退出 s.start() # 创建Socket s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # 绑定socket s.bind((host, port)) # 设置系统最大等待队列,当连接过多时,系统缓存中可以缓存多少连接,不宜设置过大,消耗内存和cpu s.listen(5) while True: # 循环接受,当接受到连接时,把连接放入队列中,由线程获取后执行 conn, addr = s.accept() cache.put((conn, addr)) conn.close()
Client端:
#!/usr/bin/env python # import socket from time import sleep from threading import Thread host = "127.0.0.1" port = 60283 num = 100 def sirec(s, n): s.sendall(‘sn:‘ + str(i)) data = s.recv(1024) s.close() for i in range(1, 100): s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) s.connect((host, port)) t = Thread(target=sirec, args=(s, 1)) #t.setDaemon(True) t.start() print ‘run over‘
关于队列Queue的使用:
q.qsize() 返回队列的大小
q.empty() 如果队列为空,返回True,反之False
q.full() 如果队列满了,返回True,反之False
q.full 与 maxsize 大小对应
q.get([block[, timeout]]) 获取队列,timeout等待时间
q.get_nowait() 相当q.get(False)
非阻塞 q.put(item) 写入队列,timeout等待时间
q.put_nowait(item) 相当q.put(item, False)
q.task_done() 在完成一项工作之后,q.task_done() 函数向任务已经完成的队列发送一个信号
q.join() 实际上意味着等到队列为空,再执行别的操作