多进程—进程同步控制,IPC
Posted gkx0731
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了多进程—进程同步控制,IPC相关的知识,希望对你有一定的参考价值。
multiprocessing包—Process模块开启多进程的两种方式,Process的方法,守护进程
进程同步控制—multiprocessing.Lock multiprocessing.Semaphore multiprocessing.Event
进程间通信(IPC)— multiprocessing.Pipe multiprocessing.Queue
进程间的数据共享 — multiprocessing.Manager
1.是运行中的程序; 2.是系统最小的资源分配单位 3.为多个任务之间的数据安全和内存隔离做约束
multiprocessing:提到多进程就要想到它
multiprocess不是一个模块而是python中一个操作、管理进程的包。 之所以叫multi是取自multiple的多功能的意思,在这个包中几乎包含了和进程有关的所有子模块。
一、multiprocessing.Process开启多进程:
#process方法: start,join,terminate,isalive 属性 name,pid,daemon(True为守护进程)
#除了子进程代码,其他都是主进程 #注意进程的三态转换 就绪态-(时间片)-执行态,阻塞态 #process方法: start,join,terminate,isalive 属性 name,pid,daemon(True为守护进程) import time from multiprocessing import Process def func(): #在子进程中执行的代码,因为被注册了 print(‘开启一个子进程‘) time.sleep(1) print(‘证明,print1和print*是一起启动的‘) #其中p是一个主进程,实例化一个进程对象, # p.start()这句话为主进程代码。(但是它告诉系统启动子进程,操作系统创建新进程,执行进程中的代码) if __name__ == ‘__main__‘:#·只有在windows下开启子进程,才需要这句话,其他系统不用 p = Process(target=func) #注册函数。 可以理解为,把func这个函数,放到一个新的py文件里执行 #p是一个进程对象,告诉系统开启一个子进程 p.start() #启动子进程,子进程启动起来后,进入就绪排队状态,延迟0.01还是0.001秒看排队长度 print(‘*‘*5) #其中 这里的print和p.start是一起启动的,不管有多少个print,都同步启动 #假设如果我们直接调用func()函数,那么print(‘*‘*5)则与函数此时是同步的,要等函数运行完才能执行print(‘*‘*5) #但是此时我们用子进程p.start告诉系统,要开启一个子进程去执行func,然后系统就会给func一个进程id,开始运行func.同时主程序会继续运行自己的代码print(‘*‘*5) # 此时程序变为了异步了,不影响p.start后面的代码运行,也就是说,p.start告诉系统开启子进程调用func后,马上运行 #print(‘*‘*5),然后进程开启后,就绪状态进入了执行状态,再运行 func。 # 此时运行func和执行print(‘*‘*5),变为了异步,不是同步了。这两段代码已经是在两条马路上跑的两辆车了 def f(name): #这里是子进程代码 print(‘hello‘, name) print(‘我是子进程‘) if __name__ == ‘__main__‘: #以下全部都是主进程代码,包括import模块那 p = Process(target=f, args=(‘bob‘,)) #主进程 p.start() #开启了一个子进程,理解为把f放到新的py里执行,但是实际上他还有后续的主进程代码【print(‘执行主进程的内容了‘)】要执行 time.sleep(1) #主进程的代码 print(‘执行主进程的内容了‘) #主进程的代码 #搭配上面的例子食用更佳。如果上面的sleep是0.1秒,就会先执行上面的print2。如果是1秒,则肯定是先执行,print(‘执行主进程的内容了‘) #此时主进程的代码,和子进程要启动的代码。就是异步执行
#pid processid进程id ppid :parent process id import os,time from multiprocessing import Process # print(‘py中的进程‘,os.getpid())#执行这整段代码,这句话会运行两次第一次指向父进程,第二次指向子进程 def func(name): print(‘参数是‘,name) print(‘子进程‘,os.getpid()) print(‘子进程的父进程‘,os.getppid()) #就是下面这一段的pid time.sleep(1) # name = input(‘111111111‘) #EOFError: EOF when reading a line print(‘子进程,调皮一下‘) if __name__ == ‘__main__‘: p = Process(target=func,args=(‘gkx‘,)) #注意传参数时候 args=(‘gkx‘,) 逗号一定要加,参数可以是*args,不能是**kwargs p.start() print(‘父进程‘,os.getpid()) print(‘父进程的父进程‘,os.getppid()) #这个就是pycharm的进程id
# 进程的生命周期 # 主进程 # 子进程 # 开启了子进程的主进程 : # 主进程自己的代码如果长,等待自己的代码执行结束, # 子进程的执行时间长,主进程会在主进程代码执行完毕之后等待子进程执行完毕之后 主进程才结束 #···子进程不一定要依赖运行着的父进程 #至于父进程如果关闭了 但是子进程没运行完, #比如 控制台 python XXX.py 运行了py文件,此时如果关闭了控制台,py文件是否会跟着关闭? #在linux中 如果 python XXX.py & 后面跟上&则 py文件会一直在后台运行 #所以,关闭父进程,子进程是否关闭要看是怎么规定的
import os from multiprocessing import Process class MyProcess(Process): def __init__(self,arg1,arg2): #如果属性都有默认值,那么在实例化的时候可以不传属性,类型函数的传值方式 super().__init__()#父类的属性都有默认值所以不用列出 self.arg1 = arg1 self.arg2 = arg2 def run(self): print(self.name) print(self.pid) print(‘arg1,arg2‘,self.arg1,self.arg2) # print(‘run参数‘,name) print(os.getpid()) if __name__ == ‘__main__‘: print(‘主进程‘,os.getpid()) p1 = MyProcess(‘1‘,‘2‘) # p1.run(‘gkx‘) p1.start() p2 = MyProcess(‘3‘,‘4‘) p2.start() #~~~第二种方法的条件: #1.自定义类,继承Process类 #2.定义个名为 run的方法,run方法中是在子进程中执行的代码 #3.子进程的参数,通过 __init__的方法初始化,并调用父类的__init__
#注意进程间是数据隔离的,所以才会有进程间的通信需求,在博客的后面会提到
from multiprocessing import Process def func(): num = input(‘>>>‘) #EOFError: EOF when reading a line print(num) if __name__ == ‘__main__‘: Process(target=func).start() #子进程中不能有input #因为在pycharm中开启多个进程,软件帮你优化后,显示在同一个控制台 #但是实际上它还是两个进程的,感受不到子进程的input,所以报错 #因此,子进程是不能input 的 #不同于聊天,QQ用的是UDP,是客户端和客户端之间的通信
(1)Process模块的几个方法:
import time,os from multiprocessing import Process def func(arg1,arg2): print(‘*‘*arg1) # time.sleep(2) #有种效果:让时间片轮转一下 print(‘=‘*arg2) if __name__ == ‘__main__‘: p = Process(target=func ,args=(10,20,)) p.start() print(‘哈哈哈哈哈‘) p.join() #感知一个子进程的结束,从异步变成了同步。有点像把子进程代码拼进主进程 print(‘====== 运行完了‘)
# 多进程代码
# from multiprocessing import Process ,一定要在 if __name__ == ‘__main__‘:下运行
# 方法
# 进程对象.start() 开启一个子进程
# 进程对象.join() 感知一个子进程的结束
# 进程对象.terminate() 结束一个子进程
# 进程对象.is_alive() 查看某个子进程是否还在运行
# 属性
# 进程对象.name 进程名
# 进程对象.pid 进程号
# 进程对象.daemon 值为True的时候,表示新的子进程是一个守护进程
# 守护进程 随着主进程代码的执行结束而结束
# 一定在start之前设置
(2)开启多个子进程
使用 空列表 [p.join() for p in p_lst] 保证多个子进程运行完后,才运行主进程
import time,os from multiprocessing import Process def func(arg1,arg2): print(‘*‘*arg1) print(‘!‘*arg1) # time.sleep(2) #由于是异步执行,这里sleep2秒,有点像几条马路上的一个红路灯,把所有子进程拦住,红绿灯一消失,所有进程就开始执行 #所以开启多个子进程,也只会等待2秒而已 print(‘=‘*arg2) if __name__ == ‘__main__‘: p_lst = [] for i in range(10): #按顺序告诉系统要开启多个子进程,但是系统并不会按顺序执行!有些是执行态,有些会是就绪态 p = Process(target=func ,args=(10*i,20*i,)) p_lst.append(p) p.start() #p.join()如果在for循环里加了p.join则类似变为了同步 [p.join() for p in p_lst] #保证print(‘====== 运行完了‘)肯定在最后运行 # p.join() #由于系统执行顺序不确定,print(‘====== 运行完了‘) 会出现在不确定的地方 print(‘+++++++ 运行完了‘)
from multiprocessing import Process import os def func(filename,content): with open(filename,‘w‘) as f: f.write(content*10*‘*‘) if __name__ == ‘__main__‘: p_lst = [] for i in range(5): p = Process(target=func,args=(‘info%s‘%i,i)) #这里进程是同时启动的,但是有些进入执行态,有些进入就绪态 p_lst.append(p) p.start() # p.join() i=0等待文件写入,i=1 等待文件写入........ [p.join() for p in p_lst] #等待大家跑步结束,之前的所有进程必须在这里都执行完才能继续往 下执行 #这里, i=1,i=2,i=3,i=4,i=5是同时写入的,全部写入完,才打印文件名 print([i for i in os.walk(os.getcwd())]) #在使用for生成进程的时候,进程是几乎同时进行的,浪费的时间只是for i in range(5) 这极短的时间 #这个例子适合场景: 所有的子进程需要异步执行,但是需要所有子进程做完了的结果,返回给主进程
在用for循环的时候,肯定是按顺序的。但是for循环只是极短的时间,生成的子进程,此时有些进程进入就绪态,有些进入执行态,所以顺序不一
(3)守护进程:
#守护进程---> 主进程结束的时候,子进程也主动结束 import time from multiprocessing import Process def func(): # 守护进程 会 随着 主进程的代码执行完毕 而 结束 while True: time.sleep(0.2) print(‘我还活着‘) def fun2(): print(‘in the func22222222222‘) time.sleep(8) print(‘func2 finished‘) if __name__ == ‘__main__‘: p = Process(target=func) p.daemon=True #设置子进程为守护进程,一定要在 start方法前 设置好 p.start() p.terminate()#强制结束一个子进程 time.sleep(0.0000000000000000000001) print(p.is_alive()) #检验子进程是否或者。虽然上一个语句结束了,但是这里操作系统正在回收,但是此时响应时间非常短,所以会返回True。sleep一下就是false了 p2 = Process(target=fun2) p2.start() # p2.join() i = 0 while i<5: print(‘我是socket server.‘) time.sleep(1) i +=1 # 守护进程 会 随着 主进程的代码执行完毕 而 结束 # 在主进程内结束一个子进程 p.terminate() # 结束一个进程不是在执行方法之后立即生效,需要一个操作系统响应的过程 # 检验一个进程是否活着的状态 p.is_alive() # p.name p.pid 这个进程的名字和进程号
(4)使用多进程实现socket服务端的并发
import socket from multiprocessing import Process def server(conn): conn.send(‘你好‘.encode(‘utf-8‘)) msg = conn.recv(1024).decode(‘utf-8‘) print(msg) conn.close() if __name__ == ‘__main__‘: sk = socket.socket() sk.bind((‘127.0.0.1‘,8080)) sk.listen() while True: conn,addr = sk.accept() p = Process(target=server,args=(conn,)) p.start() sk.close()
import socket sk = socket.socket() sk.connect((‘127.0.0.1‘,8080)) msg = sk.recv(1024).decode(‘utf-8‘) print(msg) ret = input(‘>>>> ‘) sk.send(ret.encode(‘utf8‘)) sk.close()
当多个进程使用同一份数据资源的时候,就会引发数据安全或顺序混乱问题。
(1)multiprocessing.Lock 同步锁/互斥锁。使用锁来保证数据安全:
#加锁可以保证多个进程修改同一块数据时,同一时间只能有一个任务可以进行修改,即串行的修改,没错,速度是慢了,但牺牲了速度却保证了数据安全。 虽然可以用文件共享数据实现进程间通信,但问题是: 1.效率低(共享数据基于文件,而文件是硬盘上的数据) 2.需要自己加锁处理 #因此我们最好找寻一种解决方案能够兼顾:1、效率高(多个进程共享一块内存的数据)2、帮我们处理好锁问题。这就是mutiprocessing模块为我们提供的基于消息的IPC通信机制:队列和管道。 队列和管道都是将数据存放于内存中 队列又是基于(管道+锁)实现的,可以让我们从复杂的锁问题中解脱出来, 我们应该尽量避免使用共享数据,尽可能使用消息传递和队列,避免处理复杂的同步和锁问题,而且在进程数目增多时,往往可以获得更好的可获展性。
#会造成数据不安全的地方 要加锁 lock.acquire lock.release from multiprocessing import Process,Lock import json,time def tick(): with open(‘ticket_num‘,) as f: ret = json.load(f) time.sleep(0.1) print(‘余票: %s‘%ret[‘ticket‘]) def buy_tick(i,lock): #with lock lock.acquire() #相当于拿钥匙才能进门。一旦有进程拿了钥匙,其他进程在这里拿不到,就会阻塞 with open(‘ticket_num‘,) as f: #阻塞到有人还钥匙为止 dic = json.load(f) time.sleep(0.1) if dic[‘ticket‘] > 0: dic[‘ticket‘] -= 1 print(‘