python多进程
Posted Wualin
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了python多进程相关的知识,希望对你有一定的参考价值。
多进程
- 进程:正在进行的过程或者说是一个任务,而负责执行任务则是cpu
- 同一个程序执行两次是两次进程
- 并发:
- 并行:基于多核cpu
unix开子进程的拷贝一份父进程的数据
进行的三个状态:运行,阻塞,就绪
在python中如何开启子进程
- multiprocessing模块中的process类
# 方式1
from multiprocessing import Process
import time
def task(name):
print(‘%s is running‘%name)
time.sleep(5)
print(‘%s is done‘%name)
if __name__ ==‘__main__‘:
p = Process(target=task,args=(‘子进程1‘,))
p.start() #仅仅只是给操作系统发送了一个信号
#方式二:自定义类继承自process
from multiprocessing import Process
import time
class MyProcess(Process):
def __init__(self,name):
super().__init__()
self.name = name
def run(self): #函数名必须是run
print(‘%s is running‘%self.name)
time.sleep(5)
print(‘%s is done‘%self.name)
if __name__ == ‘__main__‘:
p = MyProcess(‘子进程1‘)
p.start()#本质就是在调用p.run
print(‘进‘)
查看进程的pid与ppid(进程id)
- 补充:
- windows查看正在执行的进程:tasklist | findstr pycharm
- Mac os查看正在执行的进程:ps aus|grep pycharm
from multiprocessing import Process
import time
import os
def task(name):
print(‘%s is running,parent id is <%s>‘%(os.getpid(),os.getppid()))
time.sleep(5)
print(‘%s is done‘%os.getpid())
if __name__ ==‘__main__‘:
p = Process(target=task,args=(‘子进程1‘,))
p.start() #仅仅只是给操作系统发送了一个信号
print(‘主‘,os.getpid())
process对象的其他属性或方法
- join()等子进程结束完毕才会继续运行主进程,主进程结束后所有的僵尸进程结束
- start()只是向操作系统发送信号,并不只是把进程立马开起来,如果连续有几个start有可能执行的先后顺序会错乱,如果start方法后面立马接一个join,多个子进程会变成串行
- is_alive()查看子进程时候已经结束
- terminate()杀死这个子进程
- pid()查看进程id
- name()查看这个对象的名字
守护进程
- 当主进程执行结束子进程跟着结束:将daemon属性设置为true,守护进程不能有子进程。代码执行到程序最后一行代表程序运行结束
互斥锁
- 把并发变成串行:使用multiprocessing模块下的Lock对象
#牺牲效率实现子进程串行
from multiprocessing import Process,Lock
import time
def tack(name,lock):
lock.acquire()#加锁
print(‘%s,1‘%name)
time.sleep(1)
print(‘%s,2‘%name)
time.sleep(1)
print(‘%s,3‘%name)
lock.release()#释放锁
if __name__ == ‘__main__‘:
lock = Lock()#实例化锁对象
for i in range(3):
p = Process(target=tack,args=(‘进程%s‘%i,lock))#把锁传到子进程中
p.start()
- 互斥锁与join的区别:join确实能实现代码串行,但join把整个代码变成串行,但互斥锁可以把部分代码变成串行
事件
- 一个信号可以使所有的进程都进入阻塞状态,也可以控制所有的进程解除阻塞
- 一个事件被创建之后,默认是阻塞状态
from multiprocessing import Event
e = Event()# 创建了一事件
print(e.is_set())# 查看一个事件的状态,默认是阻塞
e.set() # 将这个事件状态改为True
print(e.is_set())
e.wait()# 根据e.is_set()的值决定是否阻塞,如果是True就是不阻塞,如果是False就是阻塞状态
e.clear() # 将这个事件状态改为False
- 事件模拟红绿灯:
import time
import random
from multiprocessing import Process,Event
def light(e):
while True:
if e.is_set():
e.clear()
print(‘红灯亮了‘)
else:
e.set()
print(‘绿灯亮了‘)
time.sleep(5)
def cars(i,e):
if not e.is_set():
print(‘%s 在等待‘%i)
e.wait()
print(‘%s 通行了‘%i)
if __name__ == ‘__main__‘:
e = Event()
p1 = Process(target=light,args=(e,))
p1.start()
for i in range(20):
p = Process(target=cars,args=(i,e))
p.start()
time.sleep(2)
队列
- multiprocessing有提供基于ipc通信类(inter-process communication)
from multiprocessing import Queue
q = Queue(4)#指定队列大小,如果不指定大小则大小为无穷尽
q.put(‘hello‘)#插入数据到队列中
print(q.full())#判断队列中数据是否满了
q.get()#从队列读取并删除一个数据
print(q.empty())#判断队列中数据是否空了
- 生产者消费这模型:
- 生产者:生产数据任务
- 消费者:消费数据任务
from multiprocessing import Process,Queue
import time
def producer(q):
for i in range(3):
res = ‘包子%s‘%i
time.sleep(0.5)
q.put(res)
print(‘生产者生产了%s‘%res)
def consumer(q):
while True:
res = q.get()
if res == None:break
time.sleep(0.7)
print(‘消费者吃了%s‘%res)
if __name__ == ‘__main__‘:
q = Queue()
p1 = Process(target=producer,args=(q,))
c1 = Process(target=consumer,args=(q,))
p1.start()
c1.start()
p1.join()
c1.join()
q.put(None)
- JoinableQueue:
- 消费者:每次获取一个数据,处理一个数据,并且发送一个记号:标志一个数据被处理成功,计数器-1
- 生产者:每一次生产一个数据,且每一次生产的数据放在队列中计数器+1,当生产者全部生产完毕之后发送一个join:已经停止生产数据且要等待之前生产的数据被消费完,当数据都被处理完时,join阻塞结束
from multiprocessing import Process,JoinableQueue
import time
def producer(q):
for i in range(3):
res = ‘包子%s‘%i
time.sleep(0.5)
q.put(res)
print(‘生产者生产了%s‘%res)
q.join() #阻塞,直到一个队列中的所有数据,全部被处理完毕
def consumer(q):
while True:
res = q.get()
time.sleep(0.7)
print(‘消费者吃了%s‘%res)
q.task_done() # 处理完一个数据,队列中的计数器-1
if __name__ == ‘__main__‘:
q = JoinableQueue()
p1 = Process(target=producer,args=(q,))
c1 = Process(target=consumer,args=(q,))
p1.start()
c1.daemon = True #守护进程,主进程中的代码执行完毕后,子进程自动结束
c1.start()
p1.join()
管道
- 在进程之间传递信息(数据)
from multiprocessing import Pipe,Process
def func(conn1,conn2):
conn1.close()
while True:
try:
msg = conn2.recv()
print(msg)
except EOFError:
print(‘子进程结束‘)
conn2.close()
break
if __name__ == ‘__main__‘:
conn1,conn2 = Pipe()
Process(target=func,args=(conn1,conn2)).start()
conn2.close()
for i in range(5):
conn1.send(‘吃屎了你‘)
conn1.close()
进程池
- 创建进程池的过程
- 创建一个属于进程的池子
- 这个池子指定能存放多少个进程
- 开启进程
from multiprocessing import Pool,Process
import time
import os
def func(i):
print(‘%s 进程正在运行‘%i,os.getpid())
time.sleep(2)
print(‘%s 进程运行结束‘%i,os.getpid())
if __name__ == ‘__main__‘:
pool = Pool(3)
for i in range(10):
# pool.apply(func,args=(i,)) 同步调用
pool.apply_async(func,args=(i,)) # 异步调用
pool.close() # 结束进程池接收任务
pool.join() # 感知进程池中的任务执行结束
# apply_async必须是与close、join一起使用
- 进程池的返回值
from multiprocessing import Pool
def func(i):
return i*i
if __name__ == ‘__main__‘:
p = Pool(5)
res_lis = []
for i in range(10):
res = p.apply_async(func,args=(i,))
res_lis.append(res)
for res in res_lis:
print(res.get())# res.get 默认是阻塞的,因为需要接收进程处理的结果,之后接收到结果之后才可以继续执行,所以
# 所以将进程的返回结果放在一个列表中,然后循环这个列表,再执行get就不会阻塞了
p.close()
p.join()
- 进程池的回调函数
from multiprocessing import Pool
import os
def func1(i):
print(‘in func1‘,os.getpid())
return i*i
def func2(ii):
print(‘in func2‘,os.getpid())
print(ii)
if __name__ == ‘__main__‘:
‘‘‘
1. func1执行结果当作func2的参数,func1进程执行结束后返回结果,通过callback调用func2,将func1的执行返回结果当作参数传递给func2执行
2. func1是一个新的进程
3. func2是在主进程中执行的
‘‘‘
p = Pool(5)
p.apply_async(func1,args=(2,),callback=func2)
p.close()
p.join()
以上是关于python多进程的主要内容,如果未能解决你的问题,请参考以下文章