第35篇 进程之间的通信 Queue Pipe 进程池Pool,p.apply()方法,p.apply_async()方法
Posted cavalier-chen
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了第35篇 进程之间的通信 Queue Pipe 进程池Pool,p.apply()方法,p.apply_async()方法相关的知识,希望对你有一定的参考价值。
内容大纲:
进程之间的通讯
进程队列
管道
进程之间的数据共享
进程池
使用进程池 开启进程
提交任务
获得返回值
回调函数
1.进程队列
先进先出
from multiprocessing import Queue import queue q = Queue() q.put(1) q.put(2) q.put(3) print(q.get()) print(q.get()) print(q.get())
1 2 3
from multiprocessing import Queue import queue q = Queue() q.put(1) q.put(2) q.put(3) print(q.get()) print(q.get()) print(q.get()) print(q.get())#q已经被取空 没法取值 程序会被夯住
from multiprocessing import Queue import queue q = Queue() q.put(1) q.put(2) q.put(3) print(q.get()) print(q.get()) print(q.get()) print(q.get_nowait())#报错queue.Empty
1 2 3
from multiprocessing import Queue import queue q = Queue() q.put(1) q.put(2) q.put(3) while True: try: print(q.get_nowait()) except queue.Empty: break
1 2 3
from multiprocessing import Queue import queue q = Queue(3)#设置队列的最大容量 q.put(1) q.put(2) q.put(3) q.put(4)#队列已经放满了 程序被夯住
from multiprocessing import Queue import queue q = Queue(3)#设置队列的最大容量 while True: try: q.put_nowait(1)#队列放满了报出异常 except queue.Full: break while True: try: print(q.get_nowait()) except queue.Empty: break
1 1 1
#q.empty()q.full()
这两个方法不是很可靠,因为别的进程会随时往队列里面添加或者取走元素
from multiprocessing import Queue import queue q = Queue(3)#设置队列的最大容量 while True: try: q.put_nowait(1)#队列放满了报出异常 except queue.Full: break print(q.empty())#判断队列是否为空 print(q.full())#判断队列是否已满 while True: try: print(q.get_nowait()) except queue.Empty: break print(q.empty()) print(q.full())
False True 1 1 1 True False
from multiprocessing import Process,Queue def consume(q): print(q.get()) if __name__ == ‘__main__‘: q = Queue() p = Process(target= consume,args=(q,)) p.start() q.put({‘123‘:456})
{‘123‘: 456}
from multiprocessing import Process,Queue def consume(q): print(‘son --->‘,q.get()) q.put(‘abc‘) if __name__ == ‘__main__‘: q = Queue() p = Process(target= consume,args=(q,)) p.start() q.put({‘123‘:456}) p.join() print(‘Foo --->‘, q.get())
son - --> {‘123‘: 456} Foo - --> abc
2.什么是生产者消费者模型
import time import random from multiprocessing import Process,Queue def consumer(q,name): while True: food = q.get()#循环不停的从队列里面取走元素 if food is None:break#取到None,退出循环 time.sleep(random.uniform(0.5,1)) print(‘%s吃了:%s‘%(name,food)) def producer(q,name,food): for i in range(10): time.sleep(random.uniform(0.3,0.8)) print(‘%s 生产了:%s%s‘%(name,food,i)) q.put(food+str(i)) if __name__ == ‘__main__‘: q = Queue() c1 = Process(target=consumer,args=(q,‘alex‘)) c1.start() p1 = Process(target=producer,args=(q,‘沙县小吃‘,‘鸡腿‘)) p1.start() p1.join()#队列里面放元素 设置成一个同步事件. q.put(None)#生产(队列里面添加元素)结束之后 最后放一个None
沙县小吃 生产了:鸡腿0 沙县小吃 生产了:鸡腿1 alex吃了:鸡腿0 沙县小吃 生产了:鸡腿2 alex吃了:鸡腿1 沙县小吃 生产了:鸡腿3 沙县小吃 生产了:鸡腿4 alex吃了:鸡腿2 沙县小吃 生产了:鸡腿5 沙县小吃 生产了:鸡腿6 alex吃了:鸡腿3 沙县小吃 生产了:鸡腿7 alex吃了:鸡腿4 沙县小吃 生产了:鸡腿8 alex吃了:鸡腿5 沙县小吃 生产了:鸡腿9 alex吃了:鸡腿6 alex吃了:鸡腿7 alex吃了:鸡腿8 alex吃了:鸡腿9
3.可阻塞的队列 JoinableQueue
多了两个方法 q.task_done() q.join()
import time import random from multiprocessing import Process,JoinableQueue def consumer(q,name): while True: food = q.get()#循环不行的从队列里面取走元素 # if food is None:break 这句代码在JoinableQueue中就不需要了 time.sleep(random.uniform(0.5,1)) print(‘%s吃了:%s‘%(name,food)) q.task_done()#完成了任务向队列汇报 #只有消费者里面才需要汇报 def producer(q,name,food): for i in range(5): time.sleep(random.uniform(0.3,0.8)) print(‘%s 生产了:%s%s‘%(name,food,i)) q.put(food+str(i)) if __name__ == ‘__main__‘: jq = JoinableQueue()#可阻塞的队列 c1 = Process(target=consumer,args=(jq,‘alex‘)) c2 = Process(target=consumer,args=(jq,‘taibai‘)) c1.daemon = True c2.daemon = True c1.start() c2.start() p1 = Process(target=producer,args=(jq,‘沙县小吃‘,‘鸡腿‘)) p2 = Process(target=producer,args=(jq,‘黄焖鸡‘,‘炒米粉‘)) p1.start() p2.start() p1.join()#生产者把所有的元素都放到队列里面才停止 p2.join() jq.join()#可阻塞的队列 设置成阻塞的
程序执行完成后结束:
沙县小吃 生产了:鸡腿0 黄焖鸡 生产了:炒米粉0 黄焖鸡 生产了:炒米粉1 alex吃了:鸡腿0 沙县小吃 生产了:鸡腿1 黄焖鸡 生产了:炒米粉2 taibai吃了:炒米粉0 沙县小吃 生产了:鸡腿2 alex吃了:炒米粉1 黄焖鸡 生产了:炒米粉3 alex吃了:炒米粉2 taibai吃了:鸡腿1 沙县小吃 生产了:鸡腿3 黄焖鸡 生产了:炒米粉4 沙县小吃 生产了:鸡腿4 alex吃了:鸡腿2 taibai吃了:炒米粉3 alex吃了:鸡腿3 taibai吃了:炒米粉4 alex吃了:鸡腿4
4,什么是管道?
管道有左右两端,左边发送右边接收,或者右边发送,左边接收
from multiprocessing import Pipe left,right = Pipe() left.send(‘hello‘) print(right.recv())
hello
from multiprocessing import Process,Pipe def consumer(pipe): print(pipe[1].recv())#pipe[1].recv()管道的右边接收 if __name__ == ‘__main__‘: pipe = Pipe() Process(target=consumer,args=(pipe,)).start() pipe[0].send(‘hello‘)
hello
from multiprocessing import Process,Pipe def consumer(left,right): print(right.recv())#pipe[1].r ecv()管道的右边接收 if __name__ == ‘__main__‘: left,right = Pipe()#生成的是一个元组(左端,右端) Process(target=consumer,args=(left,right)).start() left.send(‘hello‘)
hello
管道端口的关闭
from multiprocessing import Pipe,Process def consumer(left,right): left.close() while True: try: print(right.recv()) except EOFError: break if __name__ == ‘__main__‘: left,right = Pipe() Process(target=consumer,args=(left,right)).start() right.close() for i in range(5): left.send(‘鸡腿%s‘%i) left.close()
鸡腿0 鸡腿1 鸡腿2 鸡腿3 鸡腿4
总结一下:
队列是基于管道实现的
管道是基于socket实现的
队列+锁 是一种简便的IPC机制,是的进程之间的数据变得安全,
什么是IPC inter-process-commucate进程之间的通讯
socket+pickle实现进程之间的通讯,同一台计算机通过文件的收发实现进程之间的通讯
5,什么是进程池?为什么要有进程池?
开启过多的进程并不能够提高效率,反而会降低效率
进程的分类:
计算密集型:
重分占用CPU,多进程可以充利用CPU的多核
适合开启多个进程
IO密集型:
大部分的时间都在阻塞队列,而不是在运行状态,
根本不适合开启多个进程
信号量,多进程,进程池的概念区别:
现在需要生产500件衣服,应该买几台机器?雇佣几名工人???
信号量 模式:
500件衣服要生产 500个任务
雇佣500个人 开启了500个进程
购买4台机器 4核CPU
多进程模式:
500件衣服要生产 500个任务
雇佣500个人 开启了500个进程
购买4台机器 4核CPU
进程池模式:
500件衣服要生产 500个任务
雇佣4个人 4个人一人一台机器,不停地生产
购买4台机器 4核CPU
import time from multiprocessing import Pool,Process def func(num): print(‘生产了第%s件衣服‘%num) if __name__ == ‘__main__‘: start = time.time() p = Pool(4)#创建进程池 池子的最大进程是4个进程 for i in range(100): p.apply_async(func,args=(i,))#异步提交 func到子进程中执行 p.close()#关闭池,用户不能再向池中提交任务 p.join()#阻塞,直到进程池中所有的进程都执行完毕,主进程才能结束 print(time.time()-start)
生产了第0件衣服 ... 生产了第99件衣服 2.899761915206909#时间消耗
#多进程的方式
import time from multiprocessing import Pool,Process def func(num): print(‘生产了第%s件衣服‘%num) if __name__ == ‘__main__‘: start = time.time() p_list = [] for i in range(10): p = Process(target=func,args=(i,)) p.start() p_list.append(p) for p in p_list: p.join() print(time.time()-start)
生产了第1件衣服 生产了第9件衣服 生产了第2件衣服 生产了第3件衣服 生产了第4件衣服 生产了第5件衣服 生产了第0件衣服 生产了第8件衣服 生产了第6件衣服 生产了第7件衣服 5.608381509780884
同步提交与异步提交的区别
import time,os from multiprocessing import Pool def task(num): time.sleep(1) print(‘%s : %s ‘%(num,os.getpid())) if __name__ == ‘__main__‘: p = Pool(4) for i in range(20): p.apply(task,args=(i,))#提交任务的方式是同步提交
0 : 4424 1 : 2748 2 : 9464 3 : 4176 #后面的pid是不断的重复上面4个pid(进程编号) 4 : 4424 5 : 2748 6 : 9464 7 : 4176 8 : 4424 9 : 2748 10 : 9464 11 : 4176 12 : 4424 13 : 2748 14 : 9464 15 : 4176 16 : 4424 17 : 2748 18 : 9464 19 : 4176
#同提交的方法可以得到返回值
import time,os from multiprocessing import Pool def task(num): time.sleep(1) print(‘%s : %s ‘%(num,os.getpid())) return num**2 if __name__ == ‘__main__‘: p = Pool(4) for i in range(20): res = p.apply(task,args=(i,))#提交任务的方式是同步提交 print(‘-->‘,res)
0 : 9544 --> 0 1 : 4760 --> 1 2 : 9028 --> 4 3 : 9572 --> 9 4 : 9544 --> 16 5 : 4760 --> 25 6 : 9028 --> 36 7 : 9572 --> 49 8 : 9544 --> 64 9 : 4760 --> 81 10 : 9028 --> 100 11 : 9572 --> 121 12 : 9544 --> 144 13 : 4760 --> 169 14 : 9028 --> 196 15 : 9572 --> 225 16 : 9544 --> 256 17 : 4760 --> 289 18 : 9028 --> 324 19 : 9572 --> 361
#异步提交 不能拿到任务的结果 但是可以拿到 任务提交的情况
import time,os from multiprocessing import Pool def task(num): time.sleep(1) print(‘%s : %s ‘%(num,os.getpid())) return num**2 if __name__ == ‘__main__‘: p = Pool() for i in range(5): res = p.apply_async(task,args=(i,))#提交任务的方式是异步提交 print(‘-->‘,res)
--> <multiprocessing.pool.ApplyResult object at 0x0000000DBF84C438> --> <multiprocessing.pool.ApplyResult object at 0x0000000DBF84C518> --> <multiprocessing.pool.ApplyResult object at 0x0000000DBF84C5C0> --> <multiprocessing.pool.ApplyResult object at 0x0000000DBF84C6A0> --> <multiprocessing.pool.ApplyResult object at 0x0000000DBF84C780>
import time,os from multiprocessing import Pool def task(num): time.sleep(1) print(‘%s : %s ‘%(num,os.getpid())) if __name__ == ‘__main__‘: p = Pool() for i in range(5): p.apply_async(task,args=(i,))#提交任务的方式是同步提交 p.close()#关闭池子 不能再往里面添加任务 p.join()#进程池设置成阻塞 任务完成了主进程成才能关闭
0 : 10100 1 : 624 2 : 8288 3 : 9532 4 : 10100
进程池总结:
p = Pool()实例化的时候进程的个数 默认值是cpu的个数u,或者设置成cpu+1
提交任务:
同步提交:apply(函数名,args =())
#有返回值,返回值是子函数逇返回值
#一个任务接着一个任务按顺序同步执行,没有任何并发的结果
异步提交:apply_async
#返回值是任务提交的结果
#p.close()
#p.join()
#必须先close()再join(),p设置成阻塞,直到p中所有的进程都执行完毕,才结束主进程
#这种法法取值,与同步提交没有区别
import time,os from multiprocessing import Pool def task(num): time.sleep(1) print(‘%s : %s ‘%(num,os.getpid())) return num**2 if __name__ == ‘__main__‘: p = Pool() for i in range(5): res = p.apply_async(task,args=(i,))#提交任务的方式是同步提交 print(res.get())
0: 8836 0 1: 9248 1 2: 9780 4 3: 2588 9 4: 8836 16
#将计算的结果放进列表
import time,os from multiprocessing import Pool def task(num): time.sleep(1) print(‘%s : %s ‘%(num,os.getpid())) return num**2 if __name__ == ‘__main__‘: p = Pool() res_lsit = [] for i in range(5): res = p.apply_async(task,args=(i,))#提交任务的方式是同步提交 res_lsit.append(res)#将计算的结果放进列表 for res in res_lsit: print(res.get())
0 : 9424 0 1 : 912 1 2 : 8572 4 3 : 7436 9 4 : 9424 16
#p.map(函数名,参数)
import time,os from multiprocessing import Pool def task(num): time.sleep(1) print(‘%s : %s ‘%(num,os.getpid())) return num**2 if __name__ == ‘__main__‘: p = Pool() p.map(task,range(5))
0 : 2880 1 : 8388 2 : 7888 3 : 9540 4 : 2880
以上是关于第35篇 进程之间的通信 Queue Pipe 进程池Pool,p.apply()方法,p.apply_async()方法的主要内容,如果未能解决你的问题,请参考以下文章