多处理只读共享内存
Posted
技术标签:
【中文标题】多处理只读共享内存【英文标题】:multiprocessing read-only shared memory 【发布时间】:2021-10-15 07:03:41 【问题描述】:我已经找到了这个问题的一些答案,但是我仍然对整个多处理感到困惑。我正在尝试并行化我的程序。为了简化它,我有 2 个类 Problem
和 SubProblem
。 Problem
类在 SubProblem
类的 6 个实例上调用 solve_
方法,目前已连续解决。我相信并行解决这些实例会有所收获。
class Problem():
def __init__(self, data):
self.data = data
self.sub_pbs = i: SubProblem(data) for i in range(range(6)
def run(self):
dic_ = self.do_some_analysis() # does some analysis with self.data
result = []
for k, sp in self.sub_pbs.items():
result.append(sp.solve_(dic_, k))
return result
类SubProblem
如下:
class SubProblem:
def __init__(self,data):
self.data= self.retrieve_interesting_data(data)
def solve_(self, dic_ k):
solutions = []
# do some stuff and call many other functions and store the result in solutions
return solutions
我尝试并行化我的代码的方式(Problem
类中的run
函数)如下:
import concurrent.futures
def run(self):
dic_ = self.do_some_analysis() # does some analysis with self.data
res = []
with concurrent.futures.ProcessPoolExecutor() as executor:
results = [executor.submit(sp.solve_,dic_, k) for k, sp in self.sub_pbs.items()]
res= [f.result for f in results]
return res
真正的代码要复杂得多。现在以这种方式并行化它之后,事实证明它比串行解决它要慢。我运行分析器,我发现 _thread.Lock 对象的方法 acquire() 花费了很多时间。我认为这可能是因为访问了子问题/进程之间共享的数据。
为了运行solve_
,子问题需要两种类型的数据:一些数据所有子问题都应该可以访问它(一种全局数据,是子问题属性的一部分,但也作为参数传递给solve_
函数),以及特定于每个子问题的其他一些数据,这些数据是子问题属性的一部分,也作为参数传递给 solve
函数。但是,所有这些数据都不会在任何子问题/过程中被修改。
现在我的问题是,我应该如何更改我的代码,以便不会为每个进程复制所有子问题/进程需要访问的数据?有没有关于如何有效地将这些数据传递给进程的提示?
【问题讨论】:
您不需要复制太多数据。例如,我有一个图像缩略图比较器。在多处理开始之前,所有缩略图都已加载,并且主线程通过队列将其工作作为对象元组提供给每个进程。 (我使用multiprocessing.Process
,而不是进程池)。在挂钟经过的时间里,多进程版本比线程快大约 6 倍。
【参考方案1】:
首先,您当前的SubProblem.solve
方法只显示参数k(除了self);传递给它的 dic_
值没有参数。
多处理会带来直接的非多处理所没有的开销,即创建进程的成本以及将值从一个地址空间移动到另一个地址空间的成本。这个成本是值得的,但只有当被调用的“worker”函数,在这种情况下的各种sp.solve
方法,是 CPU 密集型的,以至于你通过并行运行它们所获得的收益超过了上述成本。因此,您的特定方法可能不适合多处理。
处理dic_
最有效的方法是根本不复制它,也就是说,在共享内存中创建它。但是为了能够做到这一点,我需要更多地了解该数据的结构,并且该结构不能是任意的;仅支持数量相当有限的共享内存类型,例如Array
类型。还有“托管”类型(请参阅调用multiprocessing.Manager()
返回的multiprocessing.managers.SyncManager
)。但是访问这些托管类型可能会很昂贵。
但是您可以做一些事情来减少开销。首先,不要创建比您需要的更大的池。您将提交 6 个任务,但您的代码将根据您拥有的 CPU 内核数量创建一个具有默认池大小的池。如果您有 12 个核心,您将创建 6 个未使用的进程。
让我们考虑相反的问题。假设您只有 4 个核心,因此您将创建一个包含 4 个进程的池。您将拨打submit
并通过dic_
6 次。如果dic_
是一小段数据,真的没必要优化这个。但是如果dic_
从一个地址空间移动到另一个地址空间的成本很高,那么最好将这些数据从主进程复制一次到池中的每个进程,并将其作为全局数据存储在每个进程的地址空间中。因此,您将保存 2 次移动此数据。对于这个特定程序来说,这不会是一个很大的节省,但它是您工具箱中的一项非常宝贵的技术。
import concurrent.futures
from os import cpu_count
def init_pool(dic):
global dic_
dic_ = dic
class Problem():
def __init__(self, data):
self.data = data
self.sub_pbs = i: SubProblem(data) for i in range(range(6))
def run(self):
dic_ = self.do_some_analysis() # does some analysis with self.data
# Don't create a pool larger than what you need:
pool_size = min(len(self.sub_pbs.items()), cpu_count())
with concurrent.futures.ProcessPoolExecutor(max_workers=pool_size, initializer=init_pool, initargs=(dic_,)) as executor:
results = [executor.submit(sp.solve_, k) for k, sp in self.sub_pbs.items()]
res= [f.result() for f in results]
return res
class SubProblem:
def __init__(self, data):
self.data = self.retrieve_interesting_data(data)
def solve_(self, k):
# dic_ is now global data (presumably read/only)
global dic_
solutions = []
# do some stuff and call many other functions and store the result in solutions
return solutions
【讨论】:
【参考方案2】:使用线程(共享内存)而不是单独的进程可能会更好,尤其是在您不修改共享数据的情况下。
这可能就像将“ProcessPoolExecutor”更改为“ThreadPoolExecutor”一样简单
这实际上取决于您在问题和子问题中的分析正在做什么。如果它使用 numpy 或 pandas,它们会释放 python GIL 并且可以从线程中受益,或者如果它主要等待 I/O。
【讨论】:
solve
函数受 CPU 限制,所以我认为线程不是一个选项。以上是关于多处理只读共享内存的主要内容,如果未能解决你的问题,请参考以下文章
尝试使用 CreateFileMapping 和自定义 DACL 创建只读共享内存区域在 OpenFileMapping 中失败