Python多处理:我可以使用更新的全局变量重用进程(已经并行化的函数)吗?

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Python多处理:我可以使用更新的全局变量重用进程(已经并行化的函数)吗?相关的知识,希望对你有一定的参考价值。

首先让我告诉你我目前的设置:

import multiprocessing.pool
from contextlib import closing
import os

def big_function(param):
   process(another_module.global_variable[param])


def dispatcher():
    # sharing read-only global variable taking benefit from Unix
    # which follows policy copy-on-update
    # https://stackoverflow.com/questions/19366259/
    another_module.global_variable = huge_list

    # send indices
    params = range(len(another_module.global_variable))

    with closing(multiprocessing.pool.Pool(processes=os.cpu_count())) as p:
        multiprocessing_result = list(p.imap_unordered(big_function, params))

    return multiprocessing_result

在这里我使用共享变量在创建进程池之前更新,其中包含大量数据,这确实让我获得了加速,因此现在似乎没有被腌制。此变量也属于导入模块的范围(如果它很重要)。

当我尝试创建这样的设置时:

another_module.global_variable = []

p = multiprocessing.pool.Pool(processes=os.cpu_count())

def dispatcher():
    # sharing read-only global variable taking benefit from Unix
    # which follows policy copy-on-update
    # https://stackoverflow.com/questions/19366259/
    another_module_global_variable = huge_list

    # send indices
    params = range(len(another_module.global_variable))

    multiprocessing_result = list(p.imap_unordered(big_function, params))

    return multiprocessing_result  

p“记得”全球共享列表是空的,并且在从调度员内部调用时拒绝使用新数据。


现在问题是:使用上面的第一个设置在8个核心上处理~600个数据对象,我的并行计算运行8秒,而单线程运行12秒。

这就是我的想法:只要多处理pickle数据,我需要每次重新创建进程,我需要pickle函数big_function(),所以我浪费时间。使用全局变量部分地解决了数据的情况(但我仍然需要在每次更新时重新创建池)。

我可以用big_function()的实例做什么(这取决于其他模块的许多其他功能,numpy等)?我可以一劳永逸地创建它的副本os.cpu_count(),并以某种方式将新数据提供给它们并获得结果,重用工人?

答案

只是为了回顾'记住'问题:

another_module.global_variable = []
p = multiprocessing.pool.Pool(processes=os.cpu_count())

def dispatcher():
    another_module_global_variable = huge_list
    params = range(len(another_module.global_variable))
    multiprocessing_result = list(p.imap_unordered(big_function, params))
    return multiprocessing_result 

什么似乎是问题是当你创建Pool实例。

这是为什么?

这是因为当你创建Pool的实例时,它确实设置了一些工作者(默认情况下等于一些CPU核心),并且它们在那时都被启动(分叉)。这意味着工作人员拥有父母全球状态的副本(以及其他所有的another_module.global_variable),以及写入时写入策略,当您更新another_module.global_variable的值时,您可以在父进程中更改它。工人可以参考旧的价值。这就是你遇到问题的原因。

这里有几个链接可以给你更多的解释:thisthis

这是一个小片段,您可以在其中切换全局变量值更改的位置以及启动进程的位置,并检查子进程中打印的内容。

from __future__ import print_function
import multiprocessing as mp

glob = dict()
glob[0] = [1, 2, 3]


def printer(a):
    print(globals())
    print(a, glob[0])


if __name__ == '__main__':
    p = mp.Process(target=printer, args=(1,))
    p.start()
    glob[0] = 'test'
    p.join()

这是Python2.7代码,但它也适用于Python3.6。

这个问题的解决方案是什么?

好吧,回到第一个解决方案。您更新导入的模块的变量的值,然后创建进程池。


现在真正的问题是缺乏加速。

以下是documentation关于如何腌制函数的有趣部分:

请注意,函数(内置和用户定义)由“完全限定”的名称引用而非值引用。这意味着只有函数名称被腌制,以及定义函数的模块的名称。函数的代码或其任何函数属性都不会被pickle。因此,定义模块必须可以在unpickling环境中导入,并且模块必须包含命名对象,否则将引发异常。

这意味着你的功能酸洗不应该是浪费时间的过程,或者至少不是它本身。导致加速不足的原因是,对于传递给imap_unordered调用的列表中的~600个数据对象,您将每个数据对象传递给一个工作进程。再次,multiprocessing.Pool的潜在实施可能是这个问题的原因。

如果你深入了解multiprocessing.Pool的实现,你会看到使用Threads的两个Queue正在处理父和所有子(工)进程之间的通信。因此,所有进程都不断需要函数参数并不断返回响应,最终会导致父进程非常繁忙。这就是为什么'花费很多'时间花在做'调度'工作来传递数据到工作进程和从工作进程传递数据的原因。

该怎么办?

尝试随时增加工作进程中进程的数据对象数。在您的示例中,您将传递一个数据对象,并且可以确保每个工作进程在任何时候都只处理一个数据对象。为什么不增加传递给工作进程的数据对象的数量?这样,您可以通过处理10个,20个甚至更多数据对象来使每个过程更加繁忙。从我所看到的,imap_unordered有一个chunksize论点。它默认设置为1。尝试增加它。像这样的东西:

import multiprocessing.pool
from contextlib import closing
import os

def big_function(params):
   results = []
   for p in params:
       results.append(process(another_module.global_variable[p]))
   return results

def dispatcher():
    # sharing read-only global variable taking benefit from Unix
    # which follows policy copy-on-update
    # https://stackoverflow.com/questions/19366259/
    another_module.global_variable = huge_list

    # send indices
    params = range(len(another_module.global_variable))

    with closing(multiprocessing.pool.Pool(processes=os.cpu_count())) as p:
        multiprocessing_result = list(p.imap_unordered(big_function, params, chunksize=10))

    return multiprocessing_result

几个建议:

  1. 我看到你创建params作为索引列表,用于在big_function中选择特定的数据对象。您可以创建表示第一个和最后一个索引的元组,并将它们传递给big_function。这可以成为增加工作量的一种方式。这是我上面提出的方法的另一种方法。
  2. 除非你明确喜欢使用Pool(processes=os.cpu_count()),否则你可以省略它。它默认采用CPU核心数。

很抱歉答案的长度或可能潜入的任何拼写错误。

以上是关于Python多处理:我可以使用更新的全局变量重用进程(已经并行化的函数)吗?的主要内容,如果未能解决你的问题,请参考以下文章

全局变量和 Python 多处理 [重复]

python多处理子进程无法访问全局变量

如何在 Python 中的多处理期间访问全局变量 [重复]

python中的多处理模块和修改共享的全局变量

labview全局变量刷新慢咋办

python多处理是否通过全局标志变量安全地进行进程间信令?