一文搞明白Python多进程编程:multiprocessing库

Posted 思源湖的鱼

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了一文搞明白Python多进程编程:multiprocessing库相关的知识,希望对你有一定的参考价值。

前言

本文试图搞明白Python多进程编程

这是几个姊妹篇:

一、基础知识

1、并行和并发

在学习的时候,发现并行和并发在好些地方搞混了,这是两个概念,得先明确下

(1)定义

Erlang 之父 Joe Armstrong 画了一张很可爱的图来解释这两个概念:

  • 并发是两个队列交替使用一台咖啡机
  • 并行是两个队列同时使用两台咖啡机


两个词很好的说明了并发和并行的区别:

  • Parallel Computing:并行计算
  • Concurrent programming:并发编程

(2)联系

那么并发并行和多进程多线程的关系呢?

  • 多核cpu,多个进程可以并行在多个cpu中计算,当然也会存在进程切换;单核cpu,多个进程在这个单核cpu中是并发运行,根据时间片读取上下文+执行程序+保存上下文。同一个进程同一时间段只能在一个cpu中运行,如果进程数小于cpu数,那么未使用的cpu将会空闲
  • 多核cpu,进程中的多线程并行执行;单核cpu,多线程在单核cpu中并发执行,根据时间片切换线程。同一个线程同一时间段只能在一个cpu内核中运行,如果线程数小于cpu内核数,那么将有多余的内核空闲

场景:

  • 多核CPU——计算密集型任务:尽量使用并行计算,可以提高任务执行效率。计算密集型任务会持续地将CPU占满,此时有越多CPU来分担任务,计算速度就会越快,这是并行的用武之地
  • 单核CPU——计算密集型任务:此时的任务已经把CPU资源100%消耗了,就没必要使用并行计算,毕竟硬件障碍摆在那里
  • 单核CPU——I/O密集型任务:I/O密集型任务在任务执行时需要经常调用磁盘、屏幕、键盘等外设,由于调用外设时CPU会空闲,所以CPU的利用率并不高,此时使用多线程程序,只是便于人机交互。计算效率提升不大。
  • 多核CPU——I/O密集型任务:同单核CPU——I/O密集型任务

总结下:

  • 并行从代码层次上强依赖于多进程/多线程代码,从硬件角度上则依赖于多核CPU
  • 并发是一种现象:同时运行多个程序或多个任务需要被处理的现象,这些任务可能是并行执行的,也可能是串行执行的,和CPU核心数无关,是操作系统进程调度和CPU上下文切换达到的结果

2、进程和线程

(1)定义

1、进程

  • 进程是程序的一次执行过程,是一个动态概念,是程序在执行过程中分配和管理资源的基本单位
  • 在面向线程设计的系统(如当代多数操作系统、Linux 2.6及更新的版本)中,进程本身不是基本运行单位,而是线程的容器
  • 进程拥有自己独立的内存空间,所属线程可以访问进程的空间
  • 程序本身只是指令、数据及其组织形式的描述,进程才是程序的真正运行实例

2、线程

  • 线程是CPU调度和分派的基本单位,它可与同属一个进程的其他的线程共享进程所拥有的全部资源
  • 当前的操作系统是面向线程的,即以线程为基本运行单位,并按线程分配CPU

(2)联系

线程是进程的一部分,一个线程只能属于一个进程,而一个进程可以有多个线程,且至少有一个线程

可以看个图

区别:理解它们的差别,从资源使用的角度出发。(所谓的资源就是计算机里的中央处理器,内存,文件,网络等等)

  • 根本区别:进程是操作系统资源分配的基本单位,而线程是任务调度和执行的基本单位

  • 在开销方面:每个进程都有独立的代码和数据空间(程序上下文),程序之间的切换会有较大的开销;线程共享代码和数据空间,每个线程都有自己独立的运行栈和程序计数器(PC),线程之间切换的开销小

  • 所处环境:在操作系统中能同时运行多个进程(程序);而在同一个进程(程序)中有多个线程同时执行(通过CPU调度,在每个时间片中只有一个线程执行)

  • 内存分配方面:系统在运行的时候会为每个进程分配不同的内存空间;而对线程而言,除了CPU外,系统不会为线程分配内存(线程所使用的资源来自其所属进程的资源),线程组之间只能共享资源

包含关系:

  • 没有线程的进程可以看做是单线程的,如果一个进程内有多个线程,则执行过程不是一条线的,而是多条线(线程)共同完成的
  • 线程是进程的一部分,所以线程也被称为轻量级进程

3、全局解释器锁GIL

GIL是计算机程序设计语言解释器用于同步线程的一种机制,它使得任何时刻仅有一个线程在执行。即便在多核心处理器上,使用 GIL 的解释器也只允许同一时间执行一个线程

Python的Cpython解释器(普遍使用的解释器)使用GIL(因为Cpython解释器是非线程安全的),在一个Python解释器进程内可以执行多线程程序,但每次一个线程执行时就会获得全局解释器锁,使得别的线程只能等待,由于GIL几乎释放的同时就会被原线程马上获得,那些等待线程可能刚唤醒,所以经常造成线程不平衡享受CPU资源,此时多线程的效率比单线程还要低下

在python的官方文档里,它是这样解释GIL的:

In CPython, the global interpreter lock, or GIL, is a mutex that prevents multiple native threads from executing Python bytecodes at once. This lock is necessary mainly because CPython’s memory management is not thread-safe. (However, since the GIL exists, other features have grown to depend on the guarantees that it enforces.)

可以说它的初衷是很好的,为了保证线程间的数据安全性;但是随着时代的发展,GIL却成为了python并行计算的最大障碍,但这个时候GIL已经遍布CPython的各个角落,修改它的工作量太大,特别是对这种开源性的语言来说。但幸好GIL只锁了线程,我们可以再新建解释器进程来实现并行,那这就是multiprocessing的工作了

不同版本的差异:

  • 在python2.x里,GIL的释放逻辑是当前线程遇见IO操作或者ticks计数达到100时进行释放。(ticks可以看作是python自身的一个计数器,专门做用于GIL,每次释放后归零,这个计数可以通过sys.setcheckinterval 来调整)。而每次释放GIL锁,线程进行锁竞争、切换线程,会消耗资源。并且由于GIL锁存在,python里一个进程永远只能同时执行一个线程(拿到GIL的线程才能执行),这就是为什么在多核CPU上,python的多线程效率并不高。
  • 在python3.x中,GIL不使用ticks计数,改为使用计时器(执行时间达到阈值后,当前线程释放GIL),这样对CPU密集型程序更加友好,但依然没有解决GIL导致的同一时间只能执行一个线程的问题,所以效率依然不尽如人意。

二、multiprocessing库

multiprocessing是python里的多进程包,在 Python 2.6 版本中加入的。通过它,我们可以在python程序里建立多进程来执行任务,从而进行并行计算。官方文档如下所述:

The multiprocessing package offers both local and remote concurrency, effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads.

1、各个接口

(1)创建进程(Process)

multiprocessing模块提供了一个Process类可以创建进程对象

创建进程有两种方式:

  • 第一种通过Process类直接创建,参数target指定子进程要执行的程序
  • 第二种通过继承Process类来实现。

我们先用第一种方式创建子进程,子进程会将传递给它的参数扩大一倍,代码如下:

#-*- coding:utf8 -*-
import os
from multiprocessing import Process, current_process

def doubler(number):
    result = number * 2
    # 获取子进程ID
    proc_id = os.getpid()
    # 获取子进程名称
    proc_name = current_process().name
    print('proc_id:{0} proc_name:{1} result:{2}'.format(proc_id, proc_name, result))

if __name__ == '__main__':
    numbers = [5, 10, 15, 20, 25]
    procs = []  
    # 父进程ID和名称
    print('parent_proc_id:{0} parent_proc_name:{1}'.format(os.getpid(), current_process().name))

    for num in numbers:
        # 创建子进程
        proc = Process(target=doubler, args=(num,))
        procs.append(proc)
        # 启动子进程
        proc.start()

    # join方法会让父进程等待子进程结束后再执行
    for proc in procs:
        proc.join()

    print("Done.")

第二种方式通过继承Process类,并重写run方法:

class MyProcess(Process):
    def __init__(self, number):
        # 必须调用父类的init方法
        super(MyProcess, self).__init__()
        self.number = number

    def run(self):
        result = self.number * 2
        # 获取子进程ID
        # self.pid
        proc_id = os.getpid()
        # 获取子进程名称
        # self.name
        proc_name = current_process().name
        print('proc_id:{0} proc_name:{1} result:{2}'.format(proc_id, proc_name, result))

if __name__ == '__main__':
    numbers = [5, 10, 15, 20, 25]
    procs = []
    # 父进程的ID和名称
    print('parent_proc_id:{0} parent_proc_name:{1}'.format(os.getpid(), current_process().name))

    for num in numbers:
        # 创建子进程
        proc = MyProcess(num)
        procs.append(proc)
        # 启动子进程,启动一个新进程实际就是执行本进程对应的run方法
        proc.start()

    # join方法会让父进程等待子进程结束后再执行
    for proc in procs:
        proc.join()

    print("Done.")

(2)进程锁(Lock)

multiprocessing模块和threading模块一样也支持锁。通过acquire获取锁,执行操作后通过release释放锁。

#-*- coding:utf8 -*-
from multiprocessing import Process, Lock

def printer(item, lock):
    # 获取锁
    lock.acquire()
    try:
        print(item)
    except Exception as e:
        print(e)
    else:
        print('no exception.')
    finally:
        # 释放锁
        lock.release()

if __name__ == '__main__':
    # 实例化全局锁
    lock = Lock()
    items = ['php', 'Python', 'Java']
    procs = []

    for item in items:
        proc = Process(target=printer, args=(item, lock))
        procs.append(proc)
        proc.start()

    for proc in procs:
        proc.join()

    print('Done.')

(3)进程池(Pool)

Pool类表示工作进程的池子,它可以提供指定数量的进程供用户调用,当有请求提交到进程池时,如果进程池有空闲进程或进程数还没到达指定上限,就会分配一个进程响应请求,否则请求只能等待。Pool类主要在执行目标多且需要控制进程数量的情况下使用,如果目标少且不用控制进程数量可以使用Process类。

进程池可以通过mapapply_async方法来调用执行代码,首先我们来看map方法:

#-*- coding:utf8 -*-
import os
from multiprocessing import Pool, current_process

def doubler(number):
    result = number * 2
    proc_id = os.getpid()
    proc_name = current_process().name
    print('proc_id:{0} proc_name:{1} result:{2}'.format(proc_id, proc_name, result))

if __name__ == '__main__':
    numbers = [5, 10, 15, 20, 25]
    pool = Pool(processes=3)
    pool.map(doubler, numbers)

    # 关闭pool使其不再接受新的任务
    pool.close()

    # 关闭pool,结束工作进程,不在处理未完成的任务
    # pool.terminate()

    # 主进程阻塞,结束工作进程,不再处理未完成的任务,join方法要在close或terminate之后使用
    pool.join()

    print('Done')

map只能向处理函数传递一个参数。

下面来看一下apply/apply_async函数,apply函数是阻塞的,apply_async函数是非阻塞的,这里我们以apply_async函数为例:

#-*- coding:utf8 -*-
import os, time
from multiprocessing import Pool, current_process

def doubler(number, parent_proc_id, parent_proc_name):
    result = number * 2
    proc_id = os.getpid()
    proc_name = current_process().name
    # 设置等待时间,可以验证apply和apply_async的阻塞和非阻塞
    time.sleep(2)
    print('parent_proc_id:{0} parent_proc_name:{1} proc_id:{2} proc_name:{3} number:{4} result:{5}'.format(parent_proc_id, parent_proc_name, proc_id, proc_name, number, result))

if __name__ == '__main__':
    numbers = [5, 10, 15, 20, 25]
    parent_proc_id = os.getpid()
    parent_proc_name = current_process().name
    pool = Pool(processes=3)
    for num in numbers:
        # 非阻塞
        pool.apply_async(doubler, (num, parent_proc_id, parent_proc_name))
        # 阻塞其它进程
        # pool.apply_async(doubler, (num, parent_proc_id, parent_proc_name))

    # 关闭pool使其不再接受新的任务
    pool.close()

    # 关闭pool,结束工作进程,不在处理未完成的任务
    # pool.terminate()

    # 主进程阻塞,结束工作进程,不再处理未完成的任务,join方法要在close或terminate之后使用
    pool.join()

    print('Done')

(4)进程间通信(Pipe、Queue

进程间通信的方式一般有管道(Pipe)、信号(Signal)、消息队列(Message)、信号量(Semaphore)、共享内存(Shared Memory)、套接字(Socket)等。这里我们着重讲一下在Python多进程编程中常用的进程方式multiprocessing.Pipe函数和multiprocessing.Queue类。

1、Pipe
multiprocessing.Pipe()即管道模式,调用Pipe()方法返回管道的两端的Connection。Pipe方法返回(conn1, conn2)代表一个管道的两个端。Pipe方法有duplex参数,如果duplex参数为True(默认值),那么这个管道是全双工模式,也就是说conn1和conn2均可收发;duplex为False,conn1只负责接受消息,conn2只负责发送消息。send()recv()方法分别是发送和接受消息的方法。一个进程从Pipe某一端输入对象,然后被Pipe另一端的进程接收,单向管道只允许管道一端的进程输入另一端的进程接收,不可以反向通信;而双向管道则允许从两端输入和从两端接收。

#-*- coding:utf8 -*-
import os, time
from multiprocessing import Process, Pipe, current_process

def proc1(pipe, data):
    for msg in range(1, 6):
        print('{0} 发送 {1}'.format(current_process().name, msg))
        pipe.send(msg)
        time.sleep(1)
    pipe.close()

def proc2(pipe, length):
    count = 0
    while True:
        count += 1
        if count == length:
            pipe.close()
        try:
            # 如果没有接收到数据recv会一直阻塞,如果管道被关闭,recv方法会抛出EOFError
            msg = pipe.recv()
            print('{0} 接收到 {1}'.format(current_process().name, msg))
        except Exception as e:
            print(e)
            break

if __name__ == '__main__':
    conn1, conn2 = Pipe(True)
    data = range(0, 6)
    length = len(data)
    proc1 = Process(target=proc1, args=(conn1, data))
    proc2 = Process(target=proc2, args=(conn2, length))

    proc1.start()
    proc2.start()

    proc1.join()
    proc2.join()

    print('Done.')

2、Queue
Queue是多进程安全的队列,可以使用Queue实现多进程之间的数据传递。Queue的使用主要是一边put(),一边get(),但是Queue可以是多个Process进行put()操作,也可以是多个Process进行get()操作。 put方法用来插入数据到队列中,put方法还有两个可选参数:block和timeout。如果block为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。如果超时,会抛出Queue.Full异常。如果block为False,但该Queue已满,会立即抛出Queue.Full异常。 get方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:block和timeout。如果block为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。如果block为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值;否则,如果队列为空,则立即抛出Queue.Empty异常。

在父进程中创建两个子进程,一个往Queue里写数据,一个从Queue里读数据:

#-*- coding:utf8 -*-
import os, time, random
from multiprocessing import Process, Queue

def write(q):
    print('Process to write: %s' % os.getpid())
    for val in range(0, 6):
        print('Put %s to queue...' % val)
        q.put(val)
        time.sleep(random.random())

def read(q):
    print('Process to read: %s' % os.getpid())
    while True:
        try:
            val = q.get(block=True, timeout=5)
            print('Get %s from queue.' % val)
        except Exception as e:
            if q.empty():
                print('队列消费完毕.')
                break

if __name__ == '__main__':
    q = Queue()

    proc1 = Process(target=write, args=(q,))
    proc2 = Process(target=read, args=(q,))

    proc1.start()
    proc2.start()

    proc1.join()
    proc2.join()

    # 如果proc2不break的话会一直阻塞,不调用join调用terminate方法可以终止进程
    # proc2.terminate()

    print('Done.')

Pipe的读写效率要高于Queue。那么我们如何的选择它们呢?

  • 如果你的环境是多生产者和消费者,那么你只能是选择queue队列
  • 如果两个进程间处理的逻辑简单,但是就是要求绝对的速度,那么pipe是个好选择

(5)共享内存(Value、Array)

共享内存
主要通过 Value 或者 Array 来实现。常见的共享的有以下几种:

In : from multiprocessing.sharedctypes import typecode_to_type

In : typecode_to_type
Out:
{'B': ctypes.c_ubyte,
 'H': ctypes.c_ushort,
 'I': ctypes.c_uint,
 'L': ctypes.c_ulong,
 'b': ctypes.c_byte,
 'c': ctypes.c_char,
 'd': ctypes.c_double,
 'f': ctypes.c_float,
 'h': ctypes.c_short,
 'i': ctypes.c_int,
 'l': ctypes.c_long,
 'u': ctypes.c_wchar}

而且共享的时候还可以给 Value 或者 Array 传递 lock 参数来决定是否带锁,如果不指定默认为 RLock。

我们看一个例子:

from multiprocessing import Process, Lock
from multiprocessing.sharedctypes import Value, Array
from ctypes import Structure, c_bool, c_double

lock = Lock()


class Point(Structure):
    _fields_ = [('x', c_double), ('y', c_double)]


def modify(n, b, s, arr, A):
    n.value **= 2
    b.value = True
    s.value = s.value.upper()
    arr[0] = 10
    for a in A:
        a.x **= 2
        a.y **= 2


n = Value('i', 7)
b = Value(c_bool, False, lock=False)
s = Array('c', 'hello world', lock=lock)
arr = Array('i', range(5), lock=True)
A = Array(Point, [(1.875, -6.25), (-5.75, 2.0)], lock=lock)

p = Process(target=modify, args=(n, b, s, arr, A))
p.start()
p.join()

print n.value
print b.value
print s.value
print arr[:]
print [(a.x, a.y) for a in A]

主要是为了演示用法。

有 2 点需要注意:

  • 并不是只支持 typecode_to_type 中指定那些类型,只要在 ctypes 里面的类型就可以。
  • arr 是一个 int 的数组,但是和 array 模块生成的数组以及 list 是不一样的,它是一个 SynchronizedArray 对象,支持的方法很有限,比如 append/extend 等方法是没有的。

输出结果如下:

❯ python shared_memory.py
49
True
HELLO WORLD
[10, 1, 2, 3, 4]
[(3.515625, 39.0625), (33.0625, 4.0)]

(6)服务器进程(Manager)

一个 multiprocessing.Manager 对象会控制一个服务器进程,其他进程可以通过代理的方式来访问这个服务器进程。 常见的共享方式有以下几种:

  • Namespace。创建一个可分享的命名空间。
  • Value/Array。和上面共享 ctypes 对象的方式一样。
  • dict/list。创建一个可分享的 dict/list,支持对应数据结构的方法。
  • Condition/Event/Lock/Queue/Semaphore。创建一个可分享的对应同步原语的对象。

看一个例子:

from multiprocessing import Manager, Process

def modify(ns, lproxy, dproxy):
    ns.a **= 2
    lproxy.extend(['b', 'c'])
    dproxy['b'] = 0


manager = Manager()
ns = manager.Namespace(一文搞明白Python多线程编程:threading库

一文搞明白Python多线程编程:threading库

一文搞明白Python并发编程和并行编程

一文搞明白Python并发编程和并行编程

python3高级编程一文搞懂多进程

一文搞明白Python协程编程:asyncio库