一文搞明白Python多进程编程:multiprocessing库
Posted 思源湖的鱼
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了一文搞明白Python多进程编程:multiprocessing库相关的知识,希望对你有一定的参考价值。
目录
前言
本文试图搞明白Python多进程编程
这是几个姊妹篇:
一、基础知识
1、并行和并发
在学习的时候,发现并行和并发在好些地方搞混了,这是两个概念,得先明确下
(1)定义
Erlang 之父 Joe Armstrong 画了一张很可爱的图来解释这两个概念:
- 并发是两个队列交替使用一台咖啡机
- 并行是两个队列同时使用两台咖啡机
两个词很好的说明了并发和并行的区别:
- Parallel Computing:并行计算
- Concurrent programming:并发编程
(2)联系
那么并发并行和多进程多线程的关系呢?
- 多核cpu,多个进程可以并行在多个cpu中计算,当然也会存在进程切换;单核cpu,多个进程在这个单核cpu中是并发运行,根据时间片读取上下文+执行程序+保存上下文。同一个进程同一时间段只能在一个cpu中运行,如果进程数小于cpu数,那么未使用的cpu将会空闲
- 多核cpu,进程中的多线程并行执行;单核cpu,多线程在单核cpu中并发执行,根据时间片切换线程。同一个线程同一时间段只能在一个cpu内核中运行,如果线程数小于cpu内核数,那么将有多余的内核空闲
场景:
- 多核CPU——计算密集型任务:尽量使用并行计算,可以提高任务执行效率。计算密集型任务会持续地将CPU占满,此时有越多CPU来分担任务,计算速度就会越快,这是并行的用武之地
- 单核CPU——计算密集型任务:此时的任务已经把CPU资源100%消耗了,就没必要使用并行计算,毕竟硬件障碍摆在那里
- 单核CPU——I/O密集型任务:I/O密集型任务在任务执行时需要经常调用磁盘、屏幕、键盘等外设,由于调用外设时CPU会空闲,所以CPU的利用率并不高,此时使用多线程程序,只是便于人机交互。计算效率提升不大。
- 多核CPU——I/O密集型任务:同单核CPU——I/O密集型任务
总结下:
- 并行从代码层次上强依赖于多进程/多线程代码,从硬件角度上则依赖于多核CPU
- 并发是一种现象:同时运行多个程序或多个任务需要被处理的现象,这些任务可能是并行执行的,也可能是串行执行的,和CPU核心数无关,是操作系统进程调度和CPU上下文切换达到的结果
2、进程和线程
(1)定义
1、进程
- 进程是程序的一次执行过程,是一个动态概念,是程序在执行过程中分配和管理资源的基本单位
- 在面向线程设计的系统(如当代多数操作系统、Linux 2.6及更新的版本)中,进程本身不是基本运行单位,而是线程的容器
- 进程拥有自己独立的内存空间,所属线程可以访问进程的空间
- 程序本身只是指令、数据及其组织形式的描述,进程才是程序的真正运行实例
2、线程
- 线程是CPU调度和分派的基本单位,它可与同属一个进程的其他的线程共享进程所拥有的全部资源
- 当前的操作系统是面向线程的,即以线程为基本运行单位,并按线程分配CPU
(2)联系
线程是进程的一部分,一个线程只能属于一个进程,而一个进程可以有多个线程,且至少有一个线程
可以看个图
区别:理解它们的差别,从资源使用的角度出发。(所谓的资源就是计算机里的中央处理器,内存,文件,网络等等)
-
根本区别:进程是操作系统资源分配的基本单位,而线程是任务调度和执行的基本单位
-
在开销方面:每个进程都有独立的代码和数据空间(程序上下文),程序之间的切换会有较大的开销;线程共享代码和数据空间,每个线程都有自己独立的运行栈和程序计数器(PC),线程之间切换的开销小
-
所处环境:在操作系统中能同时运行多个进程(程序);而在同一个进程(程序)中有多个线程同时执行(通过CPU调度,在每个时间片中只有一个线程执行)
-
内存分配方面:系统在运行的时候会为每个进程分配不同的内存空间;而对线程而言,除了CPU外,系统不会为线程分配内存(线程所使用的资源来自其所属进程的资源),线程组之间只能共享资源
包含关系:
- 没有线程的进程可以看做是单线程的,如果一个进程内有多个线程,则执行过程不是一条线的,而是多条线(线程)共同完成的
- 线程是进程的一部分,所以线程也被称为轻量级进程
3、全局解释器锁GIL
GIL是计算机程序设计语言解释器用于同步线程的一种机制,它使得任何时刻仅有一个线程在执行。即便在多核心处理器上,使用 GIL 的解释器也只允许同一时间执行一个线程
Python的Cpython解释器(普遍使用的解释器)使用GIL(因为Cpython解释器是非线程安全的),在一个Python解释器进程内可以执行多线程程序,但每次一个线程执行时就会获得全局解释器锁,使得别的线程只能等待,由于GIL几乎释放的同时就会被原线程马上获得,那些等待线程可能刚唤醒,所以经常造成线程不平衡享受CPU资源,此时多线程的效率比单线程还要低下
在python的官方文档里,它是这样解释GIL的:
In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)
可以说它的初衷是很好的,为了保证线程间的数据安全性;但是随着时代的发展,GIL却成为了python并行计算的最大障碍,但这个时候GIL已经遍布CPython的各个角落,修改它的工作量太大,特别是对这种开源性的语言来说。但幸好GIL只锁了线程,我们可以再新建解释器进程来实现并行,那这就是multiprocessing的工作了
不同版本的差异:
- 在python2.x里,GIL的释放逻辑是当前线程遇见IO操作或者ticks计数达到100时进行释放。(ticks可以看作是python自身的一个计数器,专门做用于GIL,每次释放后归零,这个计数可以通过sys.setcheckinterval 来调整)。而每次释放GIL锁,线程进行锁竞争、切换线程,会消耗资源。并且由于GIL锁存在,python里一个进程永远只能同时执行一个线程(拿到GIL的线程才能执行),这就是为什么在多核CPU上,python的多线程效率并不高。
- 在python3.x中,GIL不使用ticks计数,改为使用计时器(执行时间达到阈值后,当前线程释放GIL),这样对CPU密集型程序更加友好,但依然没有解决GIL导致的同一时间只能执行一个线程的问题,所以效率依然不尽如人意。
二、multiprocessing库
multiprocessing是python里的多进程包,在 Python 2.6 版本中加入的。通过它,我们可以在python程序里建立多进程来执行任务,从而进行并行计算。官方文档如下所述:
The multiprocessing package offers both local and remote concurrency, effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads.
1、各个接口
(1)创建进程(Process)
multiprocessing模块提供了一个Process类可以创建进程对象
创建进程有两种方式:
- 第一种通过Process类直接创建,参数target指定子进程要执行的程序
- 第二种通过继承Process类来实现。
我们先用第一种方式创建子进程,子进程会将传递给它的参数扩大一倍,代码如下:
#-*- coding:utf8 -*-
import os
from multiprocessing import Process, current_process
def doubler(number):
result = number * 2
# 获取子进程ID
proc_id = os.getpid()
# 获取子进程名称
proc_name = current_process().name
print('proc_id:{0} proc_name:{1} result:{2}'.format(proc_id, proc_name, result))
if __name__ == '__main__':
numbers = [5, 10, 15, 20, 25]
procs = []
# 父进程ID和名称
print('parent_proc_id:{0} parent_proc_name:{1}'.format(os.getpid(), current_process().name))
for num in numbers:
# 创建子进程
proc = Process(target=doubler, args=(num,))
procs.append(proc)
# 启动子进程
proc.start()
# join方法会让父进程等待子进程结束后再执行
for proc in procs:
proc.join()
print("Done.")
第二种方式通过继承Process类,并重写run方法:
class MyProcess(Process):
def __init__(self, number):
# 必须调用父类的init方法
super(MyProcess, self).__init__()
self.number = number
def run(self):
result = self.number * 2
# 获取子进程ID
# self.pid
proc_id = os.getpid()
# 获取子进程名称
# self.name
proc_name = current_process().name
print('proc_id:{0} proc_name:{1} result:{2}'.format(proc_id, proc_name, result))
if __name__ == '__main__':
numbers = [5, 10, 15, 20, 25]
procs = []
# 父进程的ID和名称
print('parent_proc_id:{0} parent_proc_name:{1}'.format(os.getpid(), current_process().name))
for num in numbers:
# 创建子进程
proc = MyProcess(num)
procs.append(proc)
# 启动子进程,启动一个新进程实际就是执行本进程对应的run方法
proc.start()
# join方法会让父进程等待子进程结束后再执行
for proc in procs:
proc.join()
print("Done.")
(2)进程锁(Lock)
multiprocessing模块和threading模块一样也支持锁。通过acquire获取锁,执行操作后通过release释放锁。
#-*- coding:utf8 -*-
from multiprocessing import Process, Lock
def printer(item, lock):
# 获取锁
lock.acquire()
try:
print(item)
except Exception as e:
print(e)
else:
print('no exception.')
finally:
# 释放锁
lock.release()
if __name__ == '__main__':
# 实例化全局锁
lock = Lock()
items = ['php', 'Python', 'Java']
procs = []
for item in items:
proc = Process(target=printer, args=(item, lock))
procs.append(proc)
proc.start()
for proc in procs:
proc.join()
print('Done.')
(3)进程池(Pool)
Pool类表示工作进程的池子,它可以提供指定数量的进程供用户调用,当有请求提交到进程池时,如果进程池有空闲进程或进程数还没到达指定上限,就会分配一个进程响应请求,否则请求只能等待。Pool类主要在执行目标多且需要控制进程数量的情况下使用,如果目标少且不用控制进程数量可以使用Process类。
进程池可以通过map
和apply_async
方法来调用执行代码,首先我们来看map
方法:
#-*- coding:utf8 -*-
import os
from multiprocessing import Pool, current_process
def doubler(number):
result = number * 2
proc_id = os.getpid()
proc_name = current_process().name
print('proc_id:{0} proc_name:{1} result:{2}'.format(proc_id, proc_name, result))
if __name__ == '__main__':
numbers = [5, 10, 15, 20, 25]
pool = Pool(processes=3)
pool.map(doubler, numbers)
# 关闭pool使其不再接受新的任务
pool.close()
# 关闭pool,结束工作进程,不在处理未完成的任务
# pool.terminate()
# 主进程阻塞,结束工作进程,不再处理未完成的任务,join方法要在close或terminate之后使用
pool.join()
print('Done')
map
只能向处理函数传递一个参数。
下面来看一下apply
/apply_async
函数,apply
函数是阻塞的,apply_async
函数是非阻塞的,这里我们以apply_async
函数为例:
#-*- coding:utf8 -*-
import os, time
from multiprocessing import Pool, current_process
def doubler(number, parent_proc_id, parent_proc_name):
result = number * 2
proc_id = os.getpid()
proc_name = current_process().name
# 设置等待时间,可以验证apply和apply_async的阻塞和非阻塞
time.sleep(2)
print('parent_proc_id:{0} parent_proc_name:{1} proc_id:{2} proc_name:{3} number:{4} result:{5}'.format(parent_proc_id, parent_proc_name, proc_id, proc_name, number, result))
if __name__ == '__main__':
numbers = [5, 10, 15, 20, 25]
parent_proc_id = os.getpid()
parent_proc_name = current_process().name
pool = Pool(processes=3)
for num in numbers:
# 非阻塞
pool.apply_async(doubler, (num, parent_proc_id, parent_proc_name))
# 阻塞其它进程
# pool.apply_async(doubler, (num, parent_proc_id, parent_proc_name))
# 关闭pool使其不再接受新的任务
pool.close()
# 关闭pool,结束工作进程,不在处理未完成的任务
# pool.terminate()
# 主进程阻塞,结束工作进程,不再处理未完成的任务,join方法要在close或terminate之后使用
pool.join()
print('Done')
(4)进程间通信(Pipe、Queue
进程间通信的方式一般有管道(Pipe)、信号(Signal)、消息队列(Message)、信号量(Semaphore)、共享内存(Shared Memory)、套接字(Socket)等。这里我们着重讲一下在Python多进程编程中常用的进程方式multiprocessing.Pipe
函数和multiprocessing.Queue
类。
1、Pipe
multiprocessing.Pipe()
即管道模式,调用Pipe()
方法返回管道的两端的Connection。Pipe方法返回(conn1, conn2)
代表一个管道的两个端。Pipe方法有duplex参数,如果duplex参数为True(默认值),那么这个管道是全双工模式,也就是说conn1和conn2均可收发;duplex为False,conn1只负责接受消息,conn2只负责发送消息。send()
和recv()
方法分别是发送和接受消息的方法。一个进程从Pipe某一端输入对象,然后被Pipe另一端的进程接收,单向管道只允许管道一端的进程输入另一端的进程接收,不可以反向通信;而双向管道则允许从两端输入和从两端接收。
#-*- coding:utf8 -*-
import os, time
from multiprocessing import Process, Pipe, current_process
def proc1(pipe, data):
for msg in range(1, 6):
print('{0} 发送 {1}'.format(current_process().name, msg))
pipe.send(msg)
time.sleep(1)
pipe.close()
def proc2(pipe, length):
count = 0
while True:
count += 1
if count == length:
pipe.close()
try:
# 如果没有接收到数据recv会一直阻塞,如果管道被关闭,recv方法会抛出EOFError
msg = pipe.recv()
print('{0} 接收到 {1}'.format(current_process().name, msg))
except Exception as e:
print(e)
break
if __name__ == '__main__':
conn1, conn2 = Pipe(True)
data = range(0, 6)
length = len(data)
proc1 = Process(target=proc1, args=(conn1, data))
proc2 = Process(target=proc2, args=(conn2, length))
proc1.start()
proc2.start()
proc1.join()
proc2.join()
print('Done.')
2、Queue
Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。Queue的使用主要是一边put()
,一边get()
,但是Queue可以是多个Process进行put()
操作,也可以是多个Process进行get()
操作。 put方法用来插入数据到队列中,put方法还有两个可选参数:block和timeout。如果block为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果block为False,但该Queue已满,会立即抛出Queue.Full异常。 get方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:block和timeout。如果block为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果block为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值;否则,如果队列为空,则立即抛出Queue.Empty异常。
在父进程中创建两个子进程,一个往Queue里写数据,一个从Queue里读数据:
#-*- coding:utf8 -*-
import os, time, random
from multiprocessing import Process, Queue
def write(q):
print('Process to write: %s' % os.getpid())
for val in range(0, 6):
print('Put %s to queue...' % val)
q.put(val)
time.sleep(random.random())
def read(q):
print('Process to read: %s' % os.getpid())
while True:
try:
val = q.get(block=True, timeout=5)
print('Get %s from queue.' % val)
except Exception as e:
if q.empty():
print('队列消费完毕.')
break
if __name__ == '__main__':
q = Queue()
proc1 = Process(target=write, args=(q,))
proc2 = Process(target=read, args=(q,))
proc1.start()
proc2.start()
proc1.join()
proc2.join()
# 如果proc2不break的话会一直阻塞,不调用join调用terminate方法可以终止进程
# proc2.terminate()
print('Done.')
Pipe的读写效率要高于Queue。那么我们如何的选择它们呢?
- 如果你的环境是多生产者和消费者,那么你只能是选择queue队列
- 如果两个进程间处理的逻辑简单,但是就是要求绝对的速度,那么pipe是个好选择
(5)共享内存(Value、Array)
共享内存
主要通过 Value 或者 Array 来实现。常见的共享的有以下几种:
In : from multiprocessing.sharedctypes import typecode_to_type
In : typecode_to_type
Out:
{'B': ctypes.c_ubyte,
'H': ctypes.c_ushort,
'I': ctypes.c_uint,
'L': ctypes.c_ulong,
'b': ctypes.c_byte,
'c': ctypes.c_char,
'd': ctypes.c_double,
'f': ctypes.c_float,
'h': ctypes.c_short,
'i': ctypes.c_int,
'l': ctypes.c_long,
'u': ctypes.c_wchar}
而且共享的时候还可以给 Value 或者 Array 传递 lock 参数来决定是否带锁,如果不指定默认为 RLock。
我们看一个例子:
from multiprocessing import Process, Lock
from multiprocessing.sharedctypes import Value, Array
from ctypes import Structure, c_bool, c_double
lock = Lock()
class Point(Structure):
_fields_ = [('x', c_double), ('y', c_double)]
def modify(n, b, s, arr, A):
n.value **= 2
b.value = True
s.value = s.value.upper()
arr[0] = 10
for a in A:
a.x **= 2
a.y **= 2
n = Value('i', 7)
b = Value(c_bool, False, lock=False)
s = Array('c', 'hello world', lock=lock)
arr = Array('i', range(5), lock=True)
A = Array(Point, [(1.875, -6.25), (-5.75, 2.0)], lock=lock)
p = Process(target=modify, args=(n, b, s, arr, A))
p.start()
p.join()
print n.value
print b.value
print s.value
print arr[:]
print [(a.x, a.y) for a in A]
主要是为了演示用法。
有 2 点需要注意:
- 并不是只支持
typecode_to_type
中指定那些类型,只要在 ctypes 里面的类型就可以。 - arr 是一个 int 的数组,但是和 array 模块生成的数组以及 list 是不一样的,它是一个 SynchronizedArray 对象,支持的方法很有限,比如 append/extend 等方法是没有的。
输出结果如下:
❯ python shared_memory.py
49
True
HELLO WORLD
[10, 1, 2, 3, 4]
[(3.515625, 39.0625), (33.0625, 4.0)]
(6)服务器进程(Manager)
一个 multiprocessing.Manager
对象会控制一个服务器进程,其他进程可以通过代理的方式来访问这个服务器进程。 常见的共享方式有以下几种:
- Namespace。创建一个可分享的命名空间。
- Value/Array。和上面共享 ctypes 对象的方式一样。
- dict/list。创建一个可分享的 dict/list,支持对应数据结构的方法。
- Condition/Event/Lock/Queue/Semaphore。创建一个可分享的对应同步原语的对象。
看一个例子:
from multiprocessing import Manager, Process
def modify(ns, lproxy, dproxy):
ns.a **= 2
lproxy.extend(['b', 'c'])
dproxy['b'] = 0
manager = Manager()
ns = manager.Namespace(一文搞明白Python多线程编程:threading库