Python 多进程教程

Posted 我这一刀下去你可能会挂

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python 多进程教程相关的知识,希望对你有一定的参考价值。

Python2.6版本中新添了multiprocessing模块。它最初由Jesse Noller和Richard Oudkerk定义在PEP 371中。就像你能通过threading模块衍生线程一样,multiprocessing 模块允许你衍生进程。这里用到的思想:因为你现在能衍生进程,所以你能够避免使用全局解释器锁(GIL),并且充分利用机器的多个处理器。

 

多进程包也包含一些根本不在threading 模块中的API。比如:有一个灵活的Pool类能让你在多个输入下并行化地执行函数。我们将在后面的小节讲解Pool类。我们将以multiprocessing模块的Process类开始讲解。

 


 

开始学习multiprocessing模块

 

Process这个类和threading模块中的Thread类很像。让我们创建一系列调用相同函数的进程,并且看看它是如何工作的。

 

import os

 

from multiprocessing import Process

 

def doubler(number):

    """

    A doubling function that can be used by a process

    """

    result = number * 2

    proc = os.getpid()

    print(‘{0} doubled to {1} by process id: {2}‘.format(

        number, result, proc))

 

if __name__ == ‘__main__‘:

    numbers = [5, 10, 15, 20, 25]

    procs = []

 

    for index, number in enumerate(numbers):

        proc = Process(target=doubler, args=(number,))

        procs.append(proc)

        proc.start()

 

    for proc in procs:

        proc.join()

 

对于上面的例子,我们导入Process类、创建一个叫doubler的函数。在函数中,我们将传入的数字乘上2。我们也用Python的os模块来获取当前进程的ID(pid)。这个ID将告诉我们哪个进程正在调用doubler函数。然后,在下面的代码块中,我们实例化了一系列的Process类并且启动它们。最后一个循环只是调用每个进程的join()方法,该方法告诉Python等待进程直到它结束。如果你需要结束一个进程,你可以调用它的terminate()方法。

 

当你运行上面的代码,你应该看到和下面类似的输出结果:

 

5 doubled to 10 by process id10468

10 doubled to 20 by process id10469

15 doubled to 30 by process id10470

20 doubled to 40 by process id10471

25 doubled to 50 by process id10472

 

有时候,你最好给你的进程取一个易于理解的名字 。幸运的是,Process类确实允许你访问同样的进程。让我们来看看如下例子:

 

import os

 

from multiprocessing import Process, current_process

 

 

def doubler(number):

    """

    A doubling function that can be used by a process

    """

    result = number * 2

    proc_name = current_process().name

    print(‘{0} doubled to {1} by: {2}‘.format(

        number, result, proc_name))

 

 

if __name__ == ‘__main__‘:

    numbers = [5, 10, 15, 20, 25]

    procs = []

    proc = Process(target=doubler, args=(5,))

 

    for index, number in enumerate(numbers):

        proc = Process(target=doubler, args=(number,))

        procs.append(proc)

        proc.start()

 

    proc = Process(target=doubler, name=‘Test‘, args=(2,))

    proc.start()

    procs.append(proc)

 

    for proc in procs:

        proc.join()

 

这一次,我们多导入了current_process。current_process基本上和threading模块的current_thread是类似的东西。我们用它来获取正在调用我们的函数的线程的名字。你将注意到我们没有给前面的5个进程设置名字。然后我们将第6个进程的名字设置为“Test”。

 

让我们看看我们将得到什么样的输出结果:

 

5 doubled to 10 byProcess-2

10 doubled to 20 byProcess-3

15 doubled to 30 byProcess-4

20 doubled to 40 byProcess-5

25 doubled to 50 byProcess-6

2 doubled to 4 byTest

 

输出结果说明:默认情况下,multiprocessing模块给每个进程分配了一个编号,而该编号被用来组成进程的名字的一部分。当然,如果我们给定了名字的话,并不会有编号被添加到名字中。

 

 

multiprocessing模块支持锁,它和threading模块做的方式一样。你需要做的只是导入Lock,获取它,做一些事,释放它。

 

from multiprocessing import Process, Lock

 

 

def printer(item, lock):

    """

    Prints out the item that was passed in

    """

    lock.acquire()

    try:

        print(item)

    finally:

        lock.release()

 

if __name__ == ‘__main__‘:

    lock = Lock()

    items = [‘tango‘, ‘foxtrot‘, 10]

    for item in items:

        p = Process(target=printer, args=(item, lock))

        p.start()

 

我们在这里创建了一个简单的用于打印函数,你输入什么,它就输出什么。为了避免线程之间互相阻塞,我们使用Lock对象。代码循环列表中的三个项并为它们各自都创建一个进程。每一个进程都将调用我们的函数,并且每次遍历到的那一项作为参数传入函数。因为我们现在使用了锁,所以队列中下一个进程将一直阻塞,直到之前的进程释放锁。

 

日志

 

为进程创建日志与为线程创建日志有一些不同。它们存在不同是因为Python的logging包不使用共享锁的进程,因此有可能以来自不同进程的信息作为结束的标志。让我们试着给前面的例子添加基本的日志。代码如下:

 

import logging

import multiprocessing

 

from multiprocessing import Process, Lock

 

def printer(item, lock):

    """

    Prints out the item that was passed in

    """

    lock.acquire()

    try:

        print(item)

    finally:

        lock.release()

 

if __name__ == ‘__main__‘:

    lock = Lock()

    items = [‘tango‘, ‘foxtrot‘, 10]

    multiprocessing.log_to_stderr()

    logger = multiprocessing.get_logger()

    logger.setLevel(logging.INFO)

    for item in items:

        p = Process(target=printer, args=(item, lock))

        p.start()

 

最简单的添加日志的方法通过推送它到stderr实现。我们能通过调用thelog_to_stderr() 函数来实现该方法。然后我们调用get_logger 函数获得一个logger实例,并将它的日志等级设为INFO。之后的代码是相同的。需要提示下这里我并没有调用join()方法。取而代之的:当它退出,父线程将自动调用join()方法。

 

当你这么做了,你应该得到类似下面的输出:

 

[INFO/Process-1] child process calling self.run()

tango

[INFO/Process-1] process shutting down

[INFO/Process-1] process exiting with exitcode 0

[INFO/Process-2] child process calling self.run()

[INFO/MainProcess] process shutting down

foxtrot

[INFO/Process-2] process shutting down

[INFO/Process-3] child process calling self.run()

[INFO/Process-2] process exiting with exitcode 0

10

[INFO/MainProcess] calling join() for process Process-3

[INFO/Process-3] process shutting down

[INFO/Process-3] process exiting with exitcode 0

[INFO/MainProcess] calling join() for process Process-2

 

现在如果你想要保存日志到硬盘中,那么这件事就显得有些棘手。你能在Python的logging Cookbook阅读一些有关那类话题。

 

Pool类

 

Pool类被用来代表一个工作进程池。它有让你将任务转移到工作进程的方法。让我们看下面一个非常简单的例子。

 

from multiprocessing import Pool

 

def doubler(number):

    return number * 2

 

if __name__ == ‘__main__‘:

    numbers = [5, 10, 20]

    pool = Pool(processes=3)

    print(pool.map(doubler, numbers))

 

基本上执行上述代码之后,一个Pool的实例被创建,并且该实例创建了3个工作进程。然后我们使用map 方法将一个函数和一个可迭代对象映射到每个进程。最后我们打印出这个例子的结果:[10, 20, 40]。

 

你也能通过apply_async方法获得池中进程的运行结果:

 

from multiprocessing import Pool

 

def doubler(number):

    return number * 2

 

if __name__ == ‘__main__‘:

    pool = Pool(processes=3)

    result = pool.apply_async(doubler, (25,))

    print(result.get(timeout=1))

 

我们上面做的事实际上就是请求进程的运行结果。那就是get函数的用途。它尝试去获取我们的结果。你能够注意到我们设置了timeout,这是为了预防我们调用的函数发生异常的情况。毕竟我们不想要它被无限期地阻塞。

 

进程通信

 

当遇到进程间通信的情况,multiprocessing 模块提供了两个主要的方法:Queues 和 Pipes。Queue 实现上既是线程安全的也是进程安全的。让我们看一个相当简单的并且基于 Queue的例子。代码来自于我的文章(threading articles)。

 

from multiprocessing import Process, Queue

 

sentinel = -1

 

def creator(data, q):

    """

    Creates data to be consumed and waits for the consumer

    to finish processing

    """

    print(‘Creating data and putting it on the queue‘)

    for item in data:

 

        q.put(item)

 

 

def my_consumer(q):

    """

    Consumes some data and works on it

 

    In this case, all it does is double the input

    """

    while True:

        data = q.get()

        print(‘data found to be processed: {}‘.format(data))

        processed = data * 2

        print(processed)

 

        if data is sentinel:

            break

 

 

if __name__ == ‘__main__‘:

    q = Queue()

    data = [5, 10, 13, -1]

    process_one = Process(target=creator, args=(data, q))

    process_two = Process(target=my_consumer, args=(q,))

    process_one.start()

    process_two.start()

 

    q.close()

    q.join_thread()

 

    process_one.join()

    process_two.join()

 

在这里我们只需要导入Queue和Process。Queue用来创建数据和添加数据到队列中,Process用来消耗数据并执行它。通过使用Queue的put()和get()方法,我们就能添加数据到Queue、从Queue获取数据。代码的最后一块只是创建了Queue 对象以及两个Process对象,并且运行它们。你能注意到我们在进程对象上调用join()方法,而不是在Queue本身上调用。

 

总结

 

我们这里有大量的资料。你已经学习如何使用multiprocessing模块指定不变的函数、使用Queues在进程间通信、给进程命名等很多事。在Python文档中也有很多本文没有接触到的知识点,因此也务必深入了解下文档。与此同时,你现在知道如何用Python利用你电脑所有的处理能力了!

 

若是想要一些Python学习资料的童鞋,可以私我756576218

以上是关于Python 多进程教程的主要内容,如果未能解决你的问题,请参考以下文章

Python基础教程之多线程与多进程

进程 vs. 线程(python的协程)(转廖雪峰老师python教程)

python教程map多进程与进度条

Python多进程运行——Multiprocessing基础教程2

python3教程:用concurrent执行多进程任务的方法

超简单的Python教程系列——第16篇:多进程