multiprocess advanced
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了multiprocess advanced相关的知识,希望对你有一定的参考价值。
Process中的方法:
守护进程:在start之前加入 daemon=True 子程序就可以在主程序运行完代码后结束。 p.s. 守护进程中不能有子进程
Pool
from multiprocessing import Pool import os,time,random def work(n): print(‘%s is working‘ %os.getpid()) # time.sleep(random.randint(1,3)) return n**2 if __name__ == ‘__main__‘: p=Pool(2) objs=[] for i in range(10): # 同步调用:提交完任务后,在原地等待任务结束,一旦结束可以立刻拿到结果 # res=p.apply(work,args=(i,)) # print(res) # 异步调用:提交完任务后,不会在原地等待任务结束,会继续提交下一次任务,等到所有任务都结束后,才get结果 obj=p.apply_async(work,args=(i,)) objs.append(obj) p.close() p.join() for obj in objs: print(obj.get())
回调式:obj=p.apply_async(get,args=(url,),callback=parse) callback参数将子程序中get函数的结果给了在主程序中运行的parse函数,最终放回结果。
Lock
lock=Lock()
from multiprocess import process, lock def task(lock): code1 balabala lock.acquire() code2 balabala lock.release() or with lock: code1 balabala lock=Lcok() for i in range(5) p=Process(target=task,args=(lock,)
排队的是lock部分的代码! 并不影响子程序的创建和其他上部分代码的运行。lock的效果比join好,一个子程序完了join再开另一个子程序,浪费时间。如果用lock锁住里面的代码,就是同时开启多个子进程,但排队等lock。
Queue:
from multiprocess import process, Queue q = Queue(3) q.put(content) q.get()
q.put_nowait() 满了也不等,抛出异常
q.get_nowait() 同理
通过Queue可以实现程序间的通信
e.g.
from multiprocessing import Queue,Process import time,random def producer(name,q): for i in range(10): time.sleep(random.randint(1,3)) res=‘泔水%s‘ %i q.put(res) print(‘厨师 %s 生产了 %s‘ %(name,res)) def consumer(name,q): while True: res=q.get() if res is None:break time.sleep(random.randint(1,3)) print(‘%s 吃了 %s‘ %(name,res)) if __name__ == ‘__main__‘: q=Queue() p1=Process(target=producer,args=(‘egon‘,q)) c1=Process(target=consumer,args=(‘alex‘,q)) p1.start() c1.start() p1.join() q.put(None
JoinableQueue
from multiprocessing import JoinableQueue,Process import time,random def producer(name,q,food): for i in range(1): time.sleep(random.randint(1,3)) res=‘%s%s‘ %(food,i) q.put(res) print(‘厨师 %s 生产了 %s‘ %(name,res)) q.join() # producer等待q清空再结束 def consumer(name,q): while True: res=q.get() if res is None:break time.sleep(random.randint(1,3)) print(‘%s 吃了 %s‘ %(name,res)) q.task_done() #queue中 -1 if __name__ == ‘__main__‘: q=JoinableQueue() p1=Process(target=producer,args=(1,q,‘泔水‘)) p2=Process(target=producer,args=(2,q,‘骨头‘)) p3=Process(target=producer,args=(3,q,‘馒头‘)) c1=Process(target=consumer,args=(‘alex‘,q)) c2=Process(target=consumer,args=(‘wupeiqi‘,q)) c1.daemon=True c2.daemon=True p1.start() p2.start() p3.start() c1.start() c2.start() p1.join() p2.join() p3.join()
Manager
共享内存
from multiprocessing import Manager,Process,Lock def work(d,lock): with lock: temp=d[‘count‘] d[‘count‘]=temp-1 if __name__ == ‘__main__‘: m=Manager() d=m.dict({"count":100}) # 也可以是m.list() lock=Lock() p_l=[] for i in range(100): p=Process(target=work,args=(d,lock)) p_l.append(p) p.start() for obj in p_l: obj.join() print(d)
以上是关于multiprocess advanced的主要内容,如果未能解决你的问题,请参考以下文章
python通过multiprocessing 实现带回调函数的异步调用的代码
Python多进程 - subprocess & multiprocess
multiprocessing:maxtasksperchild和chunksize冲突?