multiprocess advanced

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了multiprocess advanced相关的知识,希望对你有一定的参考价值。

Process中的方法:

  守护进程:在start之前加入 daemon=True 子程序就可以在主程序运行完代码后结束。  p.s.  守护进程中不能有子进程

 

Pool

技术分享
from  multiprocessing import Pool
import os,time,random
def work(n):
    print(%s is working %os.getpid())
    # time.sleep(random.randint(1,3))
    return n**2


if __name__ == __main__:
    p=Pool(2)
    objs=[]
    for i in range(10):
        # 同步调用:提交完任务后,在原地等待任务结束,一旦结束可以立刻拿到结果
        # res=p.apply(work,args=(i,))
        # print(res)

        # 异步调用:提交完任务后,不会在原地等待任务结束,会继续提交下一次任务,等到所有任务都结束后,才get结果
        obj=p.apply_async(work,args=(i,))
        objs.append(obj)

    p.close()
    p.join()
    for obj in objs:
        print(obj.get())
View Code

 

  回调式:obj=p.apply_async(get,args=(url,),callback=parse) callback参数将子程序中get函数的结果给了在主程序中运行的parse函数,最终放回结果。

  

 

Lock

lock=Lock()

from multiprocess import process, lock
def task(lock):
    code1 balabala
    lock.acquire()
    code2 balabala
    lock.release()

    or 
    with lock:
        code1 balabala

lock=Lcok()
for i in range(5)
    p=Process(target=task,args=(lock,)

排队的是lock部分的代码! 并不影响子程序的创建和其他上部分代码的运行。lock的效果比join好,一个子程序完了join再开另一个子程序,浪费时间。如果用lock锁住里面的代码,就是同时开启多个子进程,但排队等lock。

 

Queue:

from multiprocess import process, Queue
    q = Queue(3)
    q.put(content)
    q.get()

q.put_nowait() 满了也不等,抛出异常
q.get_nowait() 同理

通过Queue可以实现程序间的通信

e.g.

技术分享
from multiprocessing import Queue,Process
import time,random
def producer(name,q):
    for i in range(10):
        time.sleep(random.randint(1,3))
        res=泔水%s %i
        q.put(res)
        print(厨师 %s 生产了 %s %(name,res))
def consumer(name,q):
    while True:
        res=q.get()
        if res is None:break
        time.sleep(random.randint(1,3))
        print(%s 吃了 %s %(name,res))

if __name__ == __main__:
    q=Queue()
    p1=Process(target=producer,args=(egon,q))
    c1=Process(target=consumer,args=(alex,q))

    p1.start()
    c1.start()
    p1.join()
    q.put(None
View Code

 

JoinableQueue

from multiprocessing import JoinableQueue,Process
import time,random
def producer(name,q,food):
    for i in range(1):
        time.sleep(random.randint(1,3))
        res=%s%s %(food,i)
        q.put(res)
        print(厨师 %s 生产了 %s %(name,res))
    q.join()   # producer等待q清空再结束

def consumer(name,q):
    while True:
        res=q.get()
        if res is None:break
        time.sleep(random.randint(1,3))
        print(%s 吃了 %s %(name,res))
        q.task_done()   #queue中 -1
if __name__ == __main__:
    q=JoinableQueue()
    p1=Process(target=producer,args=(1,q,泔水))
    p2=Process(target=producer,args=(2,q,骨头))
    p3=Process(target=producer,args=(3,q,馒头))
    c1=Process(target=consumer,args=(alex,q))
    c2=Process(target=consumer,args=(wupeiqi,q))
    c1.daemon=True
    c2.daemon=True

    p1.start()
    p2.start()
    p3.start()
    c1.start()
    c2.start()

    p1.join()
    p2.join()
    p3.join()

 

Manager

共享内存

from multiprocessing import Manager,Process,Lock

def work(d,lock):
    with lock:
        temp=d[count]
        d[count]=temp-1


if __name__ == __main__:
    m=Manager()
    d=m.dict({"count":100})   # 也可以是m.list()
    lock=Lock()
    p_l=[]
    for i in range(100):
        p=Process(target=work,args=(d,lock))
        p_l.append(p)
        p.start()
    for obj in p_l:
        obj.join()

    print(d)

 


以上是关于multiprocess advanced的主要内容,如果未能解决你的问题,请参考以下文章

python通过multiprocessing 实现带回调函数的异步调用的代码

Python多进程 - subprocess & multiprocess

multiprocessing:maxtasksperchild和chunksize冲突?

如何在继续之前等待所有 multiprocessing.Processes 完成?

进程之multiprocessing模块代码篇

102-advanced-代码分割