Python进程之multiprocessing模块

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python进程之multiprocessing模块相关的知识,希望对你有一定的参考价值。

python的multiprocessing模块是跨平台的多进程模块,multiprocessing具有创建子进程,进程间通信,队列,事件,锁等功能,multiprocessing模块包含Process,Queue,Pipe,Lock等多个组件。

1、Process

创建进程的类

Process([group [, target [, name [, args [, kwargs]]]]])

参数介绍:
group参数未使用,值始终为None
target表示调用对象,即子进程要执行的任务
args表示调用对象的位置参数元组,args=()
kwargs表示调用对象的字典,kwargs={‘key‘:‘value‘}
name为子进程的名称

Note:需要使用关键字方式指定参数

示例1:创建单进程

from multiprocessing import Process

def func():
    print("first process")

if __name__ == ‘__main__‘:
    # 创建进程对象,主进程和子进程是异步执行的
    p = Process(target=func)
    # 开启进程
    p.start()

示例2:传参

from multiprocessing import Process

def func(*args,**kwargs):
    print("IPADDR:%s PORT:%d"%args)
    for k in kwargs:
        print("%s --> %s"%(k,kwargs[k]))

if __name__ == ‘__main__‘:
    # 创建进程对象,并传递参数
    p = Process(target=func,args=(‘127.0.0.1‘,8080),kwargs={‘key‘:‘value‘})
       # 如果主进程中的代码已经结束了,子进程还没结束,主进程会等待子进程
    # 开启进程
    p.start()

示例3:创建多进程

import os
from multiprocessing import Process

def func():
    # os模块的getpid方法可以获取当前进程的pid,getppid方法可以获取当前进程的父进程的pid
    print("子进程pid:%s,父进程pid:%s"%(os.getpid(),os.getppid()))

if __name__ == ‘__main__‘:
    p_l = []
    # 创建多个进程
    for i in range(10):
        p = Process(target=func)
        p.start()
        p_l.append(p)

    # 异步执行子进程,最后执行主进程中的代码
    for p in p_l:
        p.join()   # 阻塞,使主进程等待子进程结束
    print("------主进程------")
结果:
子进程pid:9944,父进程pid:1484
子进程pid:8932,父进程pid:1484
子进程pid:8504,父进程pid:1484
子进程pid:14884,父进程pid:1484
子进程pid:4828,父进程pid:1484
子进程pid:14644,父进程pid:1484
子进程pid:14908,父进程pid:1484
子进程pid:1980,父进程pid:1484
子进程pid:14604,父进程pid:1484
子进程pid:10008,父进程pid:1484
------主进程------

Note :因为在windows操作系统中,没有fork(),在创建子进程的时候会自动运行启动它的文件中的所有代码,因此必须将创建子进程的语句写在ifname==‘main‘:条件语句下。

示例4:使用类继承的方式创建进程

import os
from multiprocessing import Process

class MyProcess(Process):   # 必须继承Process类
    def __init__(self,arg1,arg2,arg3):
        ‘‘‘
        继承父类的初始化方法,加上自己需要的参数
        :param arg1:
        :param arg2:
        :param arg3:
        ‘‘‘
        super().__init__()
        self.arg1 = arg1
        self.arg2 = arg2
        self.arg3 = arg3

    def run(self):
        ‘‘‘
        必须要有run方法的实现
        :return:
        ‘‘‘
        print(‘子进程:%d ,父进程:%s ‘%(os.getpid(),os.getppid()),self.arg1,self.arg2,self.arg3)
        self.walk()  # walk方法在子进程中执行

    def walk(self):
        print(‘子进程:%d‘%os.getpid())

if __name__ == ‘__main__‘:
    p = MyProcess(1,2,3)
    p.start()  # 会默认调用run方法
    p.walk()   # walk方法直接在主进程中调用,并没有在子进程中执行

    print(‘父进程:%d ‘%os.getpid())

结果:
子进程:1220
父进程:1220 
子进程:2164 ,父进程:1220  1 2 3
子进程:2164

示例5:守护进程

在为开启daemon前,主进程会等待子进程结束在结束;
开启daemon后,程序会在主进程结束时结束子进程

import time
from multiprocessing import Process

def cal_time(second):
    while True:
        print("current time:%s"%time.ctime())
        time.sleep(second)

if __name__ == ‘__main__‘:
    p = Process(target=cal_time,args=(1,))
    ‘‘‘
    守护进程的作用:会随着主进程代码执行结束而结束
    守护进程要在start前设置
    守护进程中不能再开启子进程
    ‘‘‘
    p.daemon = True
    p.start()
    for i in range(10):
        time.sleep(0.2)
        print(‘*‘*i)

未开启daemon结果:子进程一直在运行
current time:Tue Feb 12 17:48:44 2019

*
**
***
****
current time:Tue Feb 12 17:48:45 2019
*****
******
*******
********
*********
current time:Tue Feb 12 17:48:46 2019
current time:Tue Feb 12 17:48:47 2019
current time:Tue Feb 12 17:48:48 2019
current time:Tue Feb 12 17:48:49 2019

开启daemon后结果:主进程结束程序就结束了
current time:Tue Feb 12 17:49:14 2019

*
**
***
****
current time:Tue Feb 12 17:49:15 2019
*****
******
*******
********
*********

示例6:属性:name,pid 方法:is_alive(),terminate()

name:查看进程名
pid:查看进程id
is_alive:查看进程是否正在运行
terminate:结束进程


import time
from multiprocessing import Process

def func():
    print("start")
    time.sleep(3)
    print("end")
if __name__ == ‘__main__‘:
    p = Process(target=func)
    p.start()
    time.sleep(3)
    print("进程名:%s,进程id:%s"%(p.name,p.pid))
    # is_alive方法查看进程是否正在运行
    print(p.is_alive())
    # terminate方法结束进程
    p.terminate()
    time.sleep(3)
    print(p.is_alive())
结果:
start
进程名:Process-1,进程id:17564
True
False

2、Lock

进程锁:当多个进程访问共享资源时,进程锁保证同一时间只能有一个任务可以进行修改,程序的运行方法有并发改为串行,这样速度慢了,但是保证了数据的安全

示例:

import os
import time
import random
from multiprocessing import Process,Lock

def func(lock,n):
    lock.acquire() #加锁
    print(‘%s: %s is running‘ % (n, os.getpid()))
    time.sleep(random.random())
    print(‘%s: %s is done‘ % (n, os.getpid()))
    lock.release() #释放
if __name__ == ‘__main__‘:
    lock=Lock()
    for i in range(3):
        p=Process(target=func,args=(lock,i))
        p.start()

3、Semaphore

信号量:Lock(锁)可以保证同一时间只能有一个任务对共享数据进行操作,而Semaphore(信号量)可以在同一时间让指定数量的进程操作共享数据。

示例:迷你唱吧

import time
import random
from multiprocessing import Process
from multiprocessing import Semaphore

‘‘‘
迷你唱吧,20个人,同一时间只能有4个人进去
‘‘‘

def sing(i,sem):
    sem.acquire() # 加锁
    print(‘%s enter the ktv‘%i)
    time.sleep(random.randint(1,10))
    print(‘%s leave the ktv‘%i)
    sem.release() # 释放

if __name__ == ‘__main__‘:
    sem = Semaphore(4)
    for i in range(20):
        p = Process(target=sing,args=(i,sem))
        p.start()

4、Event

事件:Event是进程之间的状态标记通信,因为进程不共享数据,所以事件对象需要以参数形式传递到函数中使用。

主要方法:

e = Event() # 实例化一个事件对象
e.set() # 标记变为非阻塞
e.wait() # 默认标记为阻塞,在等待的过程中,遇到非阻塞信号就继续执行
e.clear() # 标记变为阻塞
e.is_set() # 是否阻塞 True就是非阻塞,False是阻塞

示例:红绿灯

import time
import random
from multiprocessing import Event
from multiprocessing import Process

def traffic_light(e):
    while True:
        if e.is_set(): # True为绿灯
            time.sleep(3)  # 等3秒后变为红灯
            print("红灯亮")
            e.clear()
        else: # False为红灯,等3秒后变为绿灯
            time.sleep(3)
            print("绿灯亮")
            e.set()

def car(i,e):
    e.wait() # 默认是红灯
    print("%s 车通过"%i)

if __name__ == ‘__main__‘:
    e = Event()

    # 控制红绿灯的进程
    tra = Process(target=traffic_light,args=(e,))
    tra.start()

    for i in range(100):
        if i%6 == 0:
            time.sleep(random.randint(1,3))
        p = Process(target=car,args=(i,e))
        p.start()

5、Pipe

管道是进程间通信(IPC)的一种,管道是双向通信的,但它不保证数据安全
创建管道:p1,p2=Pipe()

主要方法:

send():发送数据
recv():接收数据
close():关闭

示例:

def func(p):
    foo,son = p
    foo.close() # 不使用主进程的管道一端,先行关闭
    while True:
        try:
            print(son.recv())
            # 子进程在结束数据时,如果管道无数据,且对端没有close,就会报EOFError;如果管道无数据,对端没close,进程会阻塞
        except EOFError:
            break

if __name__ == ‘__main__‘:
    foo,son = Pipe()
    p = Process(target=func,args=((foo,son),))
    p.start()
    son.close() # 不使用子进程的管道一端,先行关闭
    foo.send("hello")
    foo.send("hello")
    foo.close()

6、Queue

队列:进程之间是独立的,要实现进程间通信(IPC);multiprocessing模块支持两种形式:队列(queue)和管道(pipe),这两种方式都是使用消息传递的,且都是双向通信的,Queue = Pipe+Lock。

队列的两种创建方式:

q = Queue() # 创建队列对象,无长度限制
q1 = Queue(3) # 传参数,创建一个有最大长度限制的队列

队列的主要方法:

q.put(1) # 放入一个数据,对于无长度限制的队列来说,永不阻塞;对于有长度限制的队列来说,放满就阻塞

q.get() # 队列中有数据就取出一个数据,队列中无数据就会阻塞;遵循先进先出原则

q.qsize() # 查看队列的数据大小,不一定准确

示例1:主进程与子进程之间的通信

from multiprocessing import Process
from multiprocessing import Queue

def queue_put(q):
    q.put("123") # 子进程队列中放入一个变量

if __name__ == ‘__main__‘:
    q = Queue()
    p = Process(target=queue_put,args=(q,))
    p.start()
    print(q.get()) # 主进程获取到变量

示例2:子进程与子进程之间的通信
from multiprocessing import Process
from multiprocessing import Queue

def queue_put(q):
    q.put("123") # 子进程队列中放入一个变量

def queue_get(q):
    print(q.get()) # 另一个子进程获取到队列中的数据

if __name__ == ‘__main__‘:
    q = Queue()
    p = Process(target=queue_put,args=(q,))
    p.start()
    p1 = Process(target=queue_get,args=(q,))
    p1.start()

7、JoinableQueue

JoinableQueue也是multiprocessing模块的一种队列的实现,但它与Queue不同的是JoinableQueue允许项目的使用者通知生成者项目已经被成功处理。创建方式同Queue。
主要方法:put与get与Queue一致
? ? q.task_done():使用者使用此方法发出信号,表示q.get()的返回项目已经被处理。如果调用此方法的次数大于从队列中删除项目的数量,将引发ValueError异常
? ? q.join():生产者调用此方法进行阻塞,直到队列中所有的项目均被处理。阻塞将持续到队列中的每个项目均调用q.task_done()方法为止

示例:生产者消费者模型

import time
import random
from multiprocessing import JoinableQueue
from multiprocessing import Process

‘‘‘
程序执行流程
1、生产者生产的数据全部被消费 --> 2、生产者进程结束 --> 3、主进程代码执行结束 --> 4、消费者守护进程结束
‘‘‘
def producer(q,food):
    for i in range(5):
        q.put("%s -- %s"%(i,food))
        print("生产了 %s"%(food))
        time.sleep(random.random())
    q.join() # 2、等待消费者消费完所有数据

def consumer(q,name):
    while True:
        food = q.get()
        if food == None:break
        print("%s 吃了 %s"%(name,food))
        q.task_done() # 1、消费者每消费一个数据就返回一个task_done给生产者

if __name__ == ‘__main__‘:
    q = JoinableQueue()
    p1 = Process(target=producer,args=(q,‘youtiao‘))
    p1.start()
    p2 = Process(target=producer,args=(q,‘baozi‘))
    p2.start()
    c1 = Process(target=consumer,args=(q,‘daxiong‘))
    c1.daemon = True # 4、消费者守护进程结束
    c1.start()
    c2 = Process(target=consumer,args=(q,‘chenglei‘))
    c2.daemon = True
    c2.start()
    c3 = Process(target=consumer,args=(q,‘niu‘))
    c3.daemon = True
    c3.start()

    p1.join() # 3、等待p1执行完毕
    p2.join() # 3、等待p2执行完毕

8、Manager

Manager也是multiprocessing模块的一个类,这个类主要提供了进程间通信(IPC)的一个机制,它支持Python所有的数据类型,但不提供数据安全的机制。

示例:

from multiprocessing import Manager
from multiprocessing import Process

def func(d):
    print(d)
    d[‘num‘] -= 10

if __name__ == ‘__main__‘:

    m = Manager()
    d = m.dict({‘num‘:100})
    l = []
    for i in range(10):
        p = Process(target=func,args=(d,))

        p.start()
        # p.join() # 同步
        l.append(p)
    for j in l:
        j.join()   # 异步

结果:
{‘num‘: 100}
{‘num‘: 90}
{‘num‘: 80}
{‘num‘: 70}
{‘num‘: 60}
{‘num‘: 50}
{‘num‘: 40}
{‘num‘: 30}
{‘num‘: 20}
{‘num‘: 10}

9、Pool

在执行大量并发任务时,多进程是行之有效的手段之一,但是多进程需要注意几个问题,一是操作系统不可能无限开启进程,一般是有几个核开启几个进程,二是开启进程过多,系统资源占用过多,会导致系统运行速度变慢;那么遇到这种情况时pool(进程池)便是最好的解决方案。
Pool可以指定开启一定数量的进程(一般为CPU核数+1个)等待用户使用,当有新的请求进入时,如果池中有空闲进程,便直接开启;如果池中的进程都在使用,那么该请求就会等待,直到池中有进程结束,重用该进程。

示例1:多进程与进程池效率对比

import time
from multiprocessing import Process
from multiprocessing import Pool
def func(i):
    i -= 1

if __name__ == ‘__main__‘:
    # 计算进程池所需事件
    start1_time = time.time()  # 开始时间
    p = Pool(5)  # 进程池中创建5个进程
    p.map(func,range(100)) # 调用进程执行任务,target = func args = (1,2,3...),第二个参数要是可迭代对象
    p.close() # 不允许再向进程池中添加任务
    p.join() # 等待进程池中所有进程执行结束
    stop1_time = time.time() - start1_time # 结束时间
    print("进程池所需时间: %s "%stop1_time)

    # 计算多进程所需时间
    start2_time = time.time()  # 开始时间
    l = []
    for i in range(100):
        p1 = Process(target=func,args=(i,))
        p1.start()
        l.append(p)
    for j in l:
        j.join()
    stop2_time = time.time() - start2_time
    print("多进程所需时间: %s"%stop2_time)
结果:
进程池所需时间: 0.19990277290344238 
多进程所需时间: 1.7190303802490234

由上可知,进程池在执行大量并发任务时的效率。

主要方法:

map(self, func, iterable, chunksize=None):将func应用于iterable中的每个元素,收集结果在返回的列表中。
map_async(self, func, iterable, chunksize=None, callback=None,error_callback=None):异步的map
apply_async(self,func,args=(),kwds={},callback=None,error_callback=None):异步提交任务的机制
apply(self, func, args=(), kwds={}):同步提交任务的机制
close():不允许再提交新的任务
join():等待进程池中的进程执行结束在往下执行,此方法只能在close()或teminate()之后调用

执行apply或apply_async方法时,会返回ApplyResult类的实例对象
ApplyResult类有以下方法:
obj.get():获取进程的返回值
obj.ready():调用完成时,返回True
obj.successful():如果调用完成且没有引发异常,返回True,如果在结果就绪之前调用此方法,引发ValueError异常
obj.wait([timeout]):等待结果变为可用

示例2:apply与apply_async方法

import time
from multiprocessing import Pool

‘‘‘
apply:同步提交任务的机制
apply_async:异步提交任务机制
‘‘‘

def func(i):
    time.sleep(1)
    i += 1
    print(i)

if __name__ == ‘__main__‘:
    p = Pool(5)
    res_l = []
    for i in range(20):
        # p.apply(func,args=(i,)) # 同步,执行完毕立即获取到返回值
        res = p.apply_async(func,args=(i,)) # 异步,通过get获取返回值
        res_l.append(res)
    p.close() # 不允许再提交新的任务
    p.join() # 等待进程池中的进程执行结束在往下执行
    for res in res_l:
        print(res.get()) # 使用get来获取apply_aync的结果

10、pool的call函数

在进程池中,一个进程任务结束就会返回一个结果,主进程则调用一个函数去处理这个结果,这就是回调函数。回调函数是在主进程中完成的,不能传参数,只能接受多进程中函数的返回值;

示例:请求网页

在爬虫中,使用回调比较多,爬虫将访问网页、下载网页的过程放到子进程中去做,分析数据,处理数据让回调函数去做,因为访问网页与下载网页有网络延时,而处理数据只占用很小的时间


import requests
from multiprocessing import Pool
def get(url):
    ret = requests.get(url)
    return {‘url‘:url,
            ‘status_code‘:ret.status_code,
            ‘content‘:ret.text}
def parser(dic):
    print(dic[‘url‘],len(dic[‘content‘]))
    parse_url = "URL:%s  Size:%s"%(dic[‘url‘],len(dic[‘content‘]))
    with open(‘url.txt‘,‘a‘) as f:
        f.write(parse_url+‘
‘)

if __name__ == ‘__main__‘:
    url_l = [
        ‘http://www.baidu.com‘,
        ‘http://www.google.com‘,
        ‘https://zh.wikipedia.org/wiki/Wikipedia:%E9%A6%96%E9%A1%B5‘,
        ‘https://www.youtube.com/?app=desktop‘,
        ‘https://www.facebook.com/‘
    ]
    p = Pool(5)
    for i in url_l:
        p.apply_async(get,args=(i,),callback=parser)

    p.close()
    p.join()

以上是关于Python进程之multiprocessing模块的主要内容,如果未能解决你的问题,请参考以下文章

Python之进程 - multiprocessing模块

python 3 编程之多进程 multiprocessing模块

Python 多进程编程之multiprocessing--Process

Python系列之 - multiprocessing

python并发之multiprocessing

python之multiprocessing