第35篇 进程之间的通信 Queue Pipe 进程池Pool,p.apply()方法,p.apply_async()方法

Posted cavalier-chen

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了第35篇 进程之间的通信 Queue Pipe 进程池Pool,p.apply()方法,p.apply_async()方法相关的知识,希望对你有一定的参考价值。

内容大纲:
进程之间的通讯
进程队列
管道

进程之间的数据共享
进程池
使用进程池 开启进程
提交任务
获得返回值
回调函数
1.进程队列
先进先出
from multiprocessing import Queue
    import queue
    q = Queue()
    q.put(1)
    q.put(2)
    q.put(3)
    print(q.get())
    print(q.get())
    print(q.get())

  



    1
    2
    3

  



from multiprocessing import Queue
import queue
q = Queue()
q.put(1)
q.put(2)
q.put(3)
print(q.get())
print(q.get())
print(q.get())
print(q.get())#q已经被取空 没法取值 程序会被夯住

  



from multiprocessing import Queue
import queue
q = Queue()
q.put(1)
q.put(2)
q.put(3)
print(q.get())
print(q.get())
print(q.get())
print(q.get_nowait())#报错queue.Empty

  



1
2
3

  




from multiprocessing import Queue
import queue
q = Queue()
q.put(1)
q.put(2)
q.put(3)
while True:
    try:
        print(q.get_nowait())
    except queue.Empty:
        break

  


1
2
3

  



from multiprocessing import Queue
import queue
q = Queue(3)#设置队列的最大容量
q.put(1)
q.put(2)
q.put(3)
q.put(4)#队列已经放满了 程序被夯住

  



from multiprocessing import Queue
import queue
q = Queue(3)#设置队列的最大容量
while True:
    try:
        q.put_nowait(1)#队列放满了报出异常
    except queue.Full:
        break

while True:
    try:
        print(q.get_nowait())
    except queue.Empty:
        break

  


1
1
1

  




#q.empty()q.full()
这两个方法不是很可靠,因为别的进程会随时往队列里面添加或者取走元素

from multiprocessing import Queue
import queue
q = Queue(3)#设置队列的最大容量
while True:
    try:
        q.put_nowait(1)#队列放满了报出异常
    except queue.Full:
        break
print(q.empty())#判断队列是否为空
print(q.full())#判断队列是否已满
while True:
    try:
        print(q.get_nowait())
    except queue.Empty:
        break
print(q.empty())
print(q.full())

  


False
True
1
1
1
True
False

  




from multiprocessing import Process,Queue
def consume(q):
    print(q.get())

if __name__ == ‘__main__‘:
    q = Queue()
    p = Process(target= consume,args=(q,))
    p.start()
    q.put({‘123‘:456})

  


{‘123‘: 456}


from multiprocessing import Process,Queue
def consume(q):
    print(‘son --->‘,q.get())
    q.put(‘abc‘)

if __name__ == ‘__main__‘:
    q = Queue()
    p = Process(target= consume,args=(q,))
    p.start()
    q.put({‘123‘:456})
    p.join()
    print(‘Foo --->‘, q.get())

  



 son - --> {‘123‘: 456}
    Foo - --> abc

  


2.什么是生产者消费者模型
import time
import random
from multiprocessing import Process,Queue

def consumer(q,name):
    while True:
        food = q.get()#循环不停的从队列里面取走元素
        if food is None:break#取到None,退出循环
        time.sleep(random.uniform(0.5,1))
        print(‘%s吃了:%s‘%(name,food))

def producer(q,name,food):
    for i in range(10):
        time.sleep(random.uniform(0.3,0.8))
        print(‘%s 生产了:%s%s‘%(name,food,i))
        q.put(food+str(i))

if __name__ == ‘__main__‘:
    q = Queue()

    c1 = Process(target=consumer,args=(q,‘alex‘))
    c1.start()

    p1 = Process(target=producer,args=(q,‘沙县小吃‘,‘鸡腿‘))
    p1.start()

    p1.join()#队列里面放元素 设置成一个同步事件.
    q.put(None)#生产(队列里面添加元素)结束之后 最后放一个None

  



沙县小吃 生产了:鸡腿0
沙县小吃 生产了:鸡腿1
alex吃了:鸡腿0
沙县小吃 生产了:鸡腿2
alex吃了:鸡腿1
沙县小吃 生产了:鸡腿3
沙县小吃 生产了:鸡腿4
alex吃了:鸡腿2
沙县小吃 生产了:鸡腿5
沙县小吃 生产了:鸡腿6
alex吃了:鸡腿3
沙县小吃 生产了:鸡腿7
alex吃了:鸡腿4
沙县小吃 生产了:鸡腿8
alex吃了:鸡腿5
沙县小吃 生产了:鸡腿9
alex吃了:鸡腿6
alex吃了:鸡腿7
alex吃了:鸡腿8
alex吃了:鸡腿9

  




3.可阻塞的队列 JoinableQueue
多了两个方法 q.task_done() q.join()
import time
import random
from multiprocessing import Process,JoinableQueue

def consumer(q,name):
    while True:
        food = q.get()#循环不行的从队列里面取走元素
        # if food is None:break 这句代码在JoinableQueue中就不需要了
        time.sleep(random.uniform(0.5,1))
        print(‘%s吃了:%s‘%(name,food))
        q.task_done()#完成了任务向队列汇报 #只有消费者里面才需要汇报
def producer(q,name,food):
    for i in range(5):
        time.sleep(random.uniform(0.3,0.8))
        print(‘%s 生产了:%s%s‘%(name,food,i))
        q.put(food+str(i))

if __name__ == ‘__main__‘:

    jq = JoinableQueue()#可阻塞的队列

    c1 = Process(target=consumer,args=(jq,‘alex‘))
    c2 = Process(target=consumer,args=(jq,‘taibai‘))
    c1.daemon = True
    c2.daemon = True
    c1.start()
    c2.start()

    p1 = Process(target=producer,args=(jq,‘沙县小吃‘,‘鸡腿‘))
    p2 = Process(target=producer,args=(jq,‘黄焖鸡‘,‘炒米粉‘))
    p1.start()
    p2.start()

    p1.join()#生产者把所有的元素都放到队列里面才停止
    p2.join()

    jq.join()#可阻塞的队列 设置成阻塞的

  



程序执行完成后结束:
沙县小吃 生产了:鸡腿0
     黄焖鸡 生产了:炒米粉0
     黄焖鸡 生产了:炒米粉1
alex吃了:鸡腿0
     沙县小吃 生产了:鸡腿1
     黄焖鸡 生产了:炒米粉2
taibai吃了:炒米粉0
     沙县小吃 生产了:鸡腿2
alex吃了:炒米粉1
     黄焖鸡 生产了:炒米粉3
alex吃了:炒米粉2
taibai吃了:鸡腿1
     沙县小吃 生产了:鸡腿3
     黄焖鸡 生产了:炒米粉4
     沙县小吃 生产了:鸡腿4
alex吃了:鸡腿2
taibai吃了:炒米粉3
alex吃了:鸡腿3
taibai吃了:炒米粉4
alex吃了:鸡腿4

  



4,什么是管道?
管道有左右两端,左边发送右边接收,或者右边发送,左边接收
from multiprocessing import Pipe
left,right = Pipe()
left.send(‘hello‘)
print(right.recv())

  


hello

  



from multiprocessing import  Process,Pipe
def consumer(pipe):
    print(pipe[1].recv())#pipe[1].recv()管道的右边接收
if __name__ == ‘__main__‘:
    pipe = Pipe()
    Process(target=consumer,args=(pipe,)).start()
    pipe[0].send(‘hello‘)

  

hello

  


from multiprocessing import  Process,Pipe
def consumer(left,right):
    print(right.recv())#pipe[1].r
    ecv()管道的右边接收
if __name__ == ‘__main__‘:
    left,right = Pipe()#生成的是一个元组(左端,右端)
    Process(target=consumer,args=(left,right)).start()
    left.send(‘hello‘)

  

hello

  



管道端口的关闭
from multiprocessing import  Pipe,Process
def consumer(left,right):
    left.close()
    while True:
        try:
            print(right.recv())
        except EOFError:
            break
if __name__ == ‘__main__‘:
    left,right = Pipe()
    Process(target=consumer,args=(left,right)).start()
    right.close()
    for i in range(5):
        left.send(‘鸡腿%s‘%i)
    left.close()

  

    鸡腿0
    鸡腿1
    鸡腿2
    鸡腿3
    鸡腿4

  



总结一下:
队列是基于管道实现的
管道是基于socket实现的
队列+锁 是一种简便的IPC机制,是的进程之间的数据变得安全,
什么是IPC inter-process-commucate进程之间的通讯
socket+pickle实现进程之间的通讯,同一台计算机通过文件的收发实现进程之间的通讯


5,什么是进程池?为什么要有进程池?
开启过多的进程并不能够提高效率,反而会降低效率

进程的分类:
计算密集型:
重分占用CPU,多进程可以充利用CPU的多核
适合开启多个进程
IO密集型:
大部分的时间都在阻塞队列,而不是在运行状态,
根本不适合开启多个进程

信号量,多进程,进程池的概念区别:
现在需要生产500件衣服,应该买几台机器?雇佣几名工人???

信号量 模式:
500件衣服要生产 500个任务
雇佣500个人 开启了500个进程
购买4台机器 4核CPU

多进程模式:
500件衣服要生产 500个任务
雇佣500个人 开启了500个进程
购买4台机器 4核CPU

进程池模式:
500件衣服要生产 500个任务
雇佣4个人 4个人一人一台机器,不停地生产
购买4台机器 4核CPU

import  time
from  multiprocessing import Pool,Process
def func(num):
    print(‘生产了第%s件衣服‘%num)

if __name__ == ‘__main__‘:
    start = time.time()
    p = Pool(4)#创建进程池 池子的最大进程是4个进程

    for i in range(100):
        p.apply_async(func,args=(i,))#异步提交 func到子进程中执行
    p.close()#关闭池,用户不能再向池中提交任务
    p.join()#阻塞,直到进程池中所有的进程都执行完毕,主进程才能结束
    print(time.time()-start)

  


生产了第0件衣服
...
生产了第99件衣服
2.899761915206909#时间消耗

  



#多进程的方式
import  time
from  multiprocessing import Pool,Process
def func(num):
    print(‘生产了第%s件衣服‘%num)

if __name__ == ‘__main__‘:
    start = time.time()
    p_list = []
    for i in range(10):
        p = Process(target=func,args=(i,))
        p.start()
        p_list.append(p)
    for p in p_list:
        p.join()
    print(time.time()-start)

  



生产了第1件衣服
生产了第9件衣服
生产了第2件衣服
生产了第3件衣服
生产了第4件衣服
生产了第5件衣服
生产了第0件衣服
生产了第8件衣服
生产了第6件衣服
生产了第7件衣服
5.608381509780884

  




同步提交与异步提交的区别
import time,os
from multiprocessing import Pool

def task(num):
    time.sleep(1)
    print(‘%s : %s ‘%(num,os.getpid()))

if __name__ == ‘__main__‘:
    p = Pool(4)
    for i in range(20):
        p.apply(task,args=(i,))#提交任务的方式是同步提交

  



0 : 4424
1 : 2748
2 : 9464
3 : 4176 #后面的pid是不断的重复上面4个pid(进程编号)
4 : 4424
5 : 2748
6 : 9464
7 : 4176
8 : 4424
9 : 2748
10 : 9464
11 : 4176
12 : 4424
13 : 2748
14 : 9464
15 : 4176
16 : 4424
17 : 2748
18 : 9464
19 : 4176

  




#同提交的方法可以得到返回值
import time,os
from multiprocessing import Pool

def task(num):
    time.sleep(1)
    print(‘%s : %s ‘%(num,os.getpid()))
    return num**2

if __name__ == ‘__main__‘:
    p = Pool(4)
    for i in range(20):
        res = p.apply(task,args=(i,))#提交任务的方式是同步提交
        print(‘-->‘,res)

  



0 : 9544
--> 0
1 : 4760
--> 1
2 : 9028
--> 4
3 : 9572
--> 9
4 : 9544
--> 16
5 : 4760
--> 25
6 : 9028
--> 36
7 : 9572
--> 49
8 : 9544
--> 64
9 : 4760
--> 81
10 : 9028
--> 100
11 : 9572
--> 121
12 : 9544
--> 144
13 : 4760
--> 169
14 : 9028
--> 196
15 : 9572
--> 225
16 : 9544
--> 256
17 : 4760
--> 289
18 : 9028
--> 324
19 : 9572
--> 361

  



#异步提交 不能拿到任务的结果 但是可以拿到 任务提交的情况
import time,os
from multiprocessing import Pool

def task(num):
    time.sleep(1)
    print(‘%s : %s ‘%(num,os.getpid()))
    return num**2

if __name__ == ‘__main__‘:
    p = Pool()
    for i in range(5):
        res = p.apply_async(task,args=(i,))#提交任务的方式是异步提交
        print(‘-->‘,res)

  


--> <multiprocessing.pool.ApplyResult object at 0x0000000DBF84C438>
--> <multiprocessing.pool.ApplyResult object at 0x0000000DBF84C518>
--> <multiprocessing.pool.ApplyResult object at 0x0000000DBF84C5C0>
--> <multiprocessing.pool.ApplyResult object at 0x0000000DBF84C6A0>
--> <multiprocessing.pool.ApplyResult object at 0x0000000DBF84C780>

  


import time,os
from multiprocessing import Pool

def task(num):
    time.sleep(1)
    print(‘%s : %s ‘%(num,os.getpid()))

if __name__ == ‘__main__‘:
    p = Pool()
    for i in range(5):
        p.apply_async(task,args=(i,))#提交任务的方式是同步提交
    p.close()#关闭池子 不能再往里面添加任务
    p.join()#进程池设置成阻塞 任务完成了主进程成才能关闭

  



0 : 10100
1 : 624
2 : 8288
3 : 9532
4 : 10100

  



进程池总结:
p = Pool()实例化的时候进程的个数 默认值是cpu的个数u,或者设置成cpu+1
提交任务:
同步提交:apply(函数名,args =())
#有返回值,返回值是子函数逇返回值
#一个任务接着一个任务按顺序同步执行,没有任何并发的结果
异步提交:apply_async
#返回值是任务提交的结果
#p.close()
#p.join()
#必须先close()再join(),p设置成阻塞,直到p中所有的进程都执行完毕,才结束主进程

#这种法法取值,与同步提交没有区别
import time,os
from multiprocessing import Pool

def task(num):
    time.sleep(1)
    print(‘%s : %s ‘%(num,os.getpid()))
    return num**2

if __name__ == ‘__main__‘:
    p = Pool()
    for i in range(5):
        res = p.apply_async(task,args=(i,))#提交任务的方式是同步提交
        print(res.get())

  


 0: 8836
    0
    1: 9248
    1
    2: 9780
    4
    3: 2588
    9
    4: 8836
    16

  



#将计算的结果放进列表
import time,os
from multiprocessing import Pool

def task(num):
    time.sleep(1)
    print(‘%s : %s ‘%(num,os.getpid()))
    return num**2

if __name__ == ‘__main__‘:
    p = Pool()
    res_lsit = []
    for i in range(5):
        res = p.apply_async(task,args=(i,))#提交任务的方式是同步提交
        res_lsit.append(res)#将计算的结果放进列表
    for res in res_lsit:
        print(res.get())

  



0 : 9424
0
1 : 912
1
2 : 8572
4
3 : 7436
9
4 : 9424
16

  



#p.map(函数名,参数)
import time,os
from multiprocessing import Pool

def task(num):
    time.sleep(1)
    print(‘%s : %s ‘%(num,os.getpid()))
    return num**2

if __name__ == ‘__main__‘:
    p = Pool()
    p.map(task,range(5))

  



0 : 2880
1 : 8388
2 : 7888
3 : 9540
4 : 2880

  




















































































































































以上是关于第35篇 进程之间的通信 Queue Pipe 进程池Pool,p.apply()方法,p.apply_async()方法的主要内容,如果未能解决你的问题,请参考以下文章

《Python》进程之间的通信(IPC)进程之间的数据共享进程池

linux学习之进程篇

进程间通信的4种方式

8-4消息队列pipe

同一父进程下的子进程之间的通信(pipe通信)

pipe和queue.py