python-socket和进程线程协程(代码展示)
Posted tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python-socket和进程线程协程(代码展示)相关的知识,希望对你有一定的参考价值。
socket
# 一、socket # TCP服务端 import socket # 导入socket tcp_sk = socket.socket() # 实例化一个服务器对象 tcp_sk.bind((‘127.0.0.1‘, 8080)) # 服务器绑定一个IP地址和端口 tcp_sk.listen() # 监听连接 # conn可以理解为在服务端中与客户端进行交互的操作符(对象) # addr是客户端的IP地址和端口 conn, addr = tcp_sk.accept() # 接受客户端链接,此时如果没有客户端连接过来,服务器会在此等候,不会向下走(阻塞) while True: se_msg = input(‘>>>:‘) conn.send(se_msg.encode(‘utf-8‘)) # send发送的内容必须是bytes类型 re_msg = conn.recv(1024) # recv接收客户端发送过来的内容,1024是接收的字节长度 print(re_msg.decode(‘utf-8‘)) conn.close() # 关闭与客户端的连接 tcp_sk.close() # 关闭服务器 # 注意:一般情况下服务器是不会关闭的,会一直为用户提供服务。 # 客户端 import socket sk = socket.socket() # 实例化一个客户端对象 sk.connect((‘127.0.0.1‘, 8080)) # 连接服务端的IP和端口 while True: re_msg = sk.recv(1024) print(re_msg.decode(‘utf-8‘)) se_msg = input(‘>>>:‘) sk.send(se_msg.encode(‘utf-8‘)) sk.close() # 关闭客户端 # upd服务端 import socket sk = socket.socket(type=socket.SOCK_DGRAM) # 设置type使其成为udp类型的socket sk.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sk.bind((‘127.0.0.1‘, 8005)) # 绑定IP和端口 msg, addr = sk.recvfrom(1024) # 接收信息和对方IP print(msg.decode(‘utf-8‘)) print(addr) sk.sendto(‘你好‘.encode(‘utf-8‘), addr) # 发送信息给对方 sk.close() # client端: import socket sk = socket.socket(type=socket.SOCK_DGRAM) sk.sendto(‘hello‘.encode(‘utf-8‘), (‘127.0.0.1‘, 8005)) msg, addr = sk.recvfrom(1024) print(msg.decode(‘utf-8‘)) print(addr) sk.close() # 二、socket黏包 # 黏包现象只发生在tcp协议中: # 1.从表面上看,黏包问题主要是因为发送方和接收方的缓存机制、tcp协议面向流通信的特点。 # 2.实际上,主要还是因为接收方不知道消息之间的界限,不知道一次性提取多少字节的数据所造成的 # 解决方案 # 首先介绍一个模块struct:该模块可以把一个类型,如数字,转成固定长度(4)的bytes import struct ret1 = struct.pack(‘i‘, 10238976) # i代表把整型的数据转换成bytes类型的数据 ret2 = struct.pack(‘i‘, 1) print(ret1, len(ret1)) # b‘x00<x9cx00‘ 4 print(ret2, len(ret2)) # b‘x01x00x00x00‘ 4 # 可以看到:数字10238976转成bytes后,长度为4,数字1转成bytes后,长度也是为4。 num1 = struct.unpack(‘i‘, ret1) # unpack把bytes类型转成第一个参数代表的类型(这里是i,也就是int 整型,但返回的是一个元组) print(num1) # (10238976,) 元组 print(num1[0]) # 10238976 取元组的第一个值即可 # 服务端: import socket import struct sk = socket.socket() sk.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) sk.bind((‘127.0.0.1‘, 8002)) sk.listen() conn, addr = sk.accept() while True: msg = input(‘>>>:‘).encode(‘utf-8‘) # 要发送的内容 pack_num = struct.pack(‘i‘, len(msg)) # 计算内容的长度 conn.send(pack_num) conn.send(msg) conn.close() sk.close() # 客户端: import socket import struct sk = socket.socket() sk.connect((‘127.0.0.1‘, 8002)) while True: pack_num = sk.recv(4) num = struct.unpack(‘i‘, pack_num)[0] ret = sk.recv(num) print(ret.decode(‘utf-8‘)) sk.close() # 三、socketserver # TCP服务器(客户端跟之前原生的一样) import socketserver # tcp协议的server端就不需要导入socket class Myserver(socketserver.BaseRequestHandler): # 继承socketserver.BaseRequestHandler这个类 def handle(self): # 必须继承handle方法并重写 print(self.client_address) # 客户端的IP和端口: (‘127.0.0.1‘, 64491) print(type(self.request)) # <class ‘socket.socket‘> 与客户端连接的socket套接字 # <socket.socket fd=500, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, laddr=(‘127.0.0.1‘, 8888), raddr=(‘127.0.0.1‘, 64491)> print(self.request) # 与客户端连接的套接字 conn = self.request # self.request就是客户端的对象 conn.send(b‘helloworld‘) print(conn.recv(1024).decode(‘utf-8‘)) # 设置allow_reuse_address允许服务器重用地址 socketserver.TCPServer.allow_reuse_address = True # 创建一个对象server,绑定ip和端口,相当于sk = socket.socket() sk.bind((‘127.0.0.1‘,8888))这两步的结合 server = socketserver.ThreadingTCPServer((‘127.0.0.1‘, 8888), Myserver) # 让server一直运行下去,除非强制停止程序 server.serve_forever() # 客户端 import socket sk = socket.socket() sk.connect((‘127.0.0.1‘, 8888)) msg = sk.recv(1024) print(msg.decode(‘utf-8‘)) sk.send(b‘hiworld‘) sk.close() # UDP服务器 import socketserver class Myserver(socketserver.BaseRequestHandler): # 继承socketserver.BaseRequestHandler这个类 def handle(self): # 必须继承handle方法并重写 print(self.client_address) # 客户端的IP和端口: (‘127.0.0.1‘, 60575) print(type(self.request)) # udp的时候,request是元组:<class ‘tuple‘> print(self.request[0]) # 客户端的消息msg: b‘dog‘ print(self.request[1]) # udp套接字: <socket.socket fd=480, family=AddressFamily.AF_INET, type=SocketKind.SOCK_DGRAM, proto=0, laddr=(‘127.0.0.1‘, 8888)> sk = self.request[1] sk.sendto(b‘cat‘, self.client_address) # 设置allow_reuse_address允许服务器重用地址 socketserver.UDPServer.allow_reuse_address = True # 创建一个对象server,绑定ip和端口,相当于sk = socket.socket(type=socket.SOCK_DGRAM) sk.bind((‘127.0.0.1‘,8888))这两步的结合 server = socketserver.ThreadingUDPServer((‘127.0.0.1‘, 8888), Myserver) # 让server一直运行下去,除非强制停止程序 server.serve_forever() # 客户端 import socket sk = socket.socket(type=socket.SOCK_DGRAM) sk.sendto(b‘dog‘, (‘127.0.0.1‘, 8888)) msg, addr = sk.recvfrom(1024) print(msg.decode(‘utf-8‘)) sk.close()
进程
# 一、进程Process # 在windows中使用需要注意 # 在Windows操作系统中由于没有fork(linux操作系统中创建进程的机制),在创建子进程的时候会自动 import 启动它的这个文件,而在 import 的时候又执行了整个文件。因此如果将process()直接写在文件中就会无限递归创建子进程报错。 # 所以必须把创建子进程的部分使用if __name__ ==‘__main__‘ 判断保护起来,import 的时候 ,就不会递归运行了。 # join:父进程等待子进程结束后才继续执行自己后续的代码 import time import random from multiprocessing import Process def func(index): time.sleep(random.randint(1, 3)) print(‘第%s封邮件发送完毕‘ % index) if __name__ == ‘__main__‘: p_lst = [] for i in range(10): p = Process(target=func, args=(i,)) p.start() # 先让所有子进程都启动 p_lst.append(p) for p in p_lst: # 再进行join阻塞 p.join() print(‘10封邮件全部发送完毕‘) # 守护进程 # 主进程创建守护进程 # 1:守护进程会在主进程代码执行结束后就终止 # 2:守护进程内无法再开启子进程,否则抛出异常:AssertionError: daemonic processes are not allowed to have children # 注意:进程之间是互相独立的,主进程代码运行结束,守护进程随即终止 # 1,守护进程会在主进程代码执行结束后就终止 import time from multiprocessing import Process def func(): print(‘子进程 start‘) time.sleep(3) # 睡3秒的时候主进程的代码已经执行完毕了,所以子进程也会跟着结束 print(‘子进程end‘) if __name__ == ‘__main__‘: p = Process(target=func) p.daemon = True # daemon是Process的属性 p.start() time.sleep(2) # 睡2秒的时候,执行了子进程 print(‘主进程‘) # 结果: # 子进程 start # 主进程 # 锁 # 加锁降低了程序的效率,让原来能够同时执行的代码变成顺序执行了,异步变同步的过程 # 保证了数据的安全 import time import json from multiprocessing import Process from multiprocessing import Lock # 导入Lock类 def search(person): with open(‘ticket‘) as f: ticketinfo = json.load(f) print(‘%s查询余票:‘ % person, ticketinfo[‘count‘]) def get_ticket(person): with open(‘ticket‘) as f: ticketinfo = json.load(f) time.sleep(0.2) # 模拟读数据的网络延迟 if ticketinfo[‘count‘] > 0: print(‘%s买到票了‘ % person) ticketinfo[‘count‘] -= 1 time.sleep(0.2) with open(‘ticket‘, ‘w‘) as f: json.dump(ticketinfo, f) else: print(‘%s没买到票‘ % person) def ticket(person, lock): search(person) lock.acquire() # 谁获得钥匙 谁才能进入 get_ticket(person) lock.release() # 用完了,把钥匙给下一个人 if __name__ == ‘__main__‘: lock = Lock() # 创建一个锁对象 for i in range(5): p = Process(target=ticket, args=(‘person%s‘ % i, lock)) p.start() # 结果: # person1查询余票: 3 # person3查询余票: 3 # person0查询余票: 3 # person2查询余票: 3 # person4查询余票: 3 # person1买到票了 # person3买到票了 # person0买到票了 # person2没买到票 # person4没买到票 # 1、信号量的实现机制:计数器 + 锁实现的 # 信号量同步基于内部计数器,每调用一次acquire(),计数器减1;每调用一次release(),计数器加1.当计数器为0时,acquire() # 调用被阻塞。 # 互斥锁同时只允许一个线程更改数据,而信号量Semaphore是同时允许一定数量的线程更改数据(Samphore相当于有几把钥匙,lock只能有一把钥匙) import time import random from multiprocessing import Process from multiprocessing import Semaphore def changba(person, sem): # 在唱吧 唱歌 sem.acquire() # 第一次可以同时进来两个人 print(‘%s走进唱吧‘ % person) time.sleep(random.randint(3, 6)) # 每个人唱歌的时间 print(‘%s走出唱吧‘ % person) # 唱完走人 sem.release() # 把钥匙给下一个人 if __name__ == ‘__main__‘: sem = Semaphore(2) # 2把钥匙 for i in range(5): p = Process(target=changba, args=(‘person%s‘ % i, sem)) p.start() # 事件 Event # 事件主要提供了三个方法 # set、wait、clear。 # 事件处理的机制:全局定义了一个“Flag”, # 如果“Flag”值为False,那么当程序执行event.wait方法时就会阻塞, # 如果“Flag”值为True,那么event.wait方法时便不再阻塞。 # 阻塞事件 :wait()方法 # wait是否阻塞是看event对象内部的Flag # 控制Flag的值: # set() 将Flag的值改成True # clear() 将Flag的值改成False # is_set() 判断当前的Flag的值 # 红绿灯: import time import random from multiprocessing import Process from multiprocessing import Event def traffic_ligth(e): # 红绿灯 print(‘