利用“写时复制”将数据复制到 Multiprocessing.Pool() 工作进程
Posted
技术标签:
【中文标题】利用“写时复制”将数据复制到 Multiprocessing.Pool() 工作进程【英文标题】:Leveraging "Copy-on-Write" to Copy Data to Multiprocessing.Pool() Worker Processes 【发布时间】:2016-10-31 06:20:53 【问题描述】:我有一点multiprocessing
Python 代码,看起来有点像这样:
import time
from multiprocessing import Pool
import numpy as np
class MyClass(object):
def __init__(self):
self.myAttribute = np.zeros(100000000) # basically a big memory struct
def my_multithreaded_analysis(self):
arg_lists = [(self, i) for i in range(10)]
pool = Pool(processes=10)
result = pool.map(call_method, arg_lists)
print result
def analyze(self, i):
time.sleep(10)
return i ** 2
def call_method(args):
my_instance, i = args
return my_instance.analyze(i)
if __name__ == '__main__':
my_instance = MyClass()
my_instance.my_multithreaded_analysis()
在阅读了其他 *** 答案(例如 Python multiprocessing memory usage)中有关内存如何工作的答案后,我的印象是,这不会与我用于多处理的进程数量成比例地使用内存,因为它是复制的写,我没有修改my_instance
的任何属性。但是,当我运行 top 时,我确实看到所有进程的内存都很高,它说我的大多数进程都在使用大量内存(这是 OSX 的***输出,但我可以在 Linux 上复制)。
我的问题基本上是,我的解释是否正确,因为我的 MyClass
实例实际上在池中重复?如果是这样,我该如何防止这种情况;我不应该使用这样的结构吗?我的目标是减少计算分析的内存使用量。
PID COMMAND %CPU TIME #TH #WQ #PORT MEM PURG CMPRS PGRP PPID STATE
2494 Python 0.0 00:01.75 1 0 7 765M 0B 0B 2484 2484 sleeping
2493 Python 0.0 00:01.85 1 0 7 765M 0B 0B 2484 2484 sleeping
2492 Python 0.0 00:01.86 1 0 7 765M 0B 0B 2484 2484 sleeping
2491 Python 0.0 00:01.83 1 0 7 765M 0B 0B 2484 2484 sleeping
2490 Python 0.0 00:01.87 1 0 7 765M 0B 0B 2484 2484 sleeping
2489 Python 0.0 00:01.79 1 0 7 167M 0B 597M 2484 2484 sleeping
2488 Python 0.0 00:01.77 1 0 7 10M 0B 755M 2484 2484 sleeping
2487 Python 0.0 00:01.75 1 0 7 8724K 0B 756M 2484 2484 sleeping
2486 Python 0.0 00:01.78 1 0 7 9968K 0B 755M 2484 2484 sleeping
2485 Python 0.0 00:01.74 1 0 7 171M 0B 594M 2484 2484 sleeping
2484 Python 0.1 00:16.43 4 0 18 775M 0B 12K 2484 2235 sleeping
【问题讨论】:
您是如何生成此分析器结果的? 【参考方案1】:发送到pool.map
(和相关方法)的任何内容实际上都没有使用共享的写时复制资源。值是"pickled" (Python's serialization mechanism),通过管道发送到工作进程并在那里解压,从头开始重建子对象中的对象。因此,在这种情况下,每个孩子最终都会得到原始数据的写入时复制版本(它从未使用过,因为它被告知使用通过 IPC 发送的副本),以及原始数据的个人重建在孩子中重建,不共享。
如果您想利用分叉的写时复制优势,您不能通过管道发送数据(或引用数据的对象)。您必须将它们存储在可以通过访问孩子自己的全局变量从孩子那里找到的位置。比如:
import os
import time
from multiprocessing import Pool
import numpy as np
class MyClass(object):
def __init__(self):
self.myAttribute = os.urandom(1024*1024*1024) # basically a big memory struct(~1GB size)
def my_multithreaded_analysis(self):
arg_lists = list(range(10)) # Don't pass self
pool = Pool(processes=10)
result = pool.map(call_method, arg_lists)
print result
def analyze(self, i):
time.sleep(10)
return i ** 2
def call_method(i):
# Implicitly use global copy of my_instance, not one passed as an argument
return my_instance.analyze(i)
# Constructed globally and unconditionally, so the instance exists
# prior to forking in commonly accessible location
my_instance = MyClass()
if __name__ == '__main__':
my_instance.my_multithreaded_analysis()
通过不传递self
,您可以避免复制,而只需使用写入时复制映射到子级的单个全局对象。如果您需要多个对象,则可以在创建池之前将全局 list
或 dict
映射到对象的实例,然后将可以查找对象的索引或键作为参数的一部分传递( s) 到pool.map
。然后,工作函数使用索引/键(必须通过 IPC 腌制并发送给子级)来查找全局 dict(也是写时复制映射)中的值(写时复制映射),因此,您复制廉价信息以在子项中查找昂贵的数据而不复制它。
如果对象很小,即使您不给它们写信,它们最终也会被复制。 CPython 是引用计数的,引用计数出现在公共对象标头中并不断更新,只需引用对象即可,即使它是逻辑上非可变引用。如此小的对象(以及分配在同一内存页中的所有其他对象)将被写入,因此被复制。对于大型对象(您的一亿个元素 numpy 数组),只要您不写入它,它的大部分将保持共享,因为标题只占用许多页面之一
在 python 版本 3.8 中更改:在 macOS 上,spawn start 方法现在是默认方法。见mulitprocessing doc。 Spawn 没有利用写时复制。
【讨论】:
另请注意:如果对象很小,即使您不写入它们,它们最终也会被复制。 CPython 是引用计数的,引用计数出现在公共对象标头中并不断更新,只需引用对象即可,即使它是逻辑上非可变引用。如此小的对象(以及分配在同一内存页中的所有其他对象)将被写入,因此被复制。对于大型对象(您的亿个元素numpy
数组),只要您不写入,其中大部分将保持共享,因为标题只占用许多页面之一。
我已将您的评论合并到答案正文中。该语句的含义是,对于普通 Python 数据结构(列表、字典等),在子进程中的引用点触发一个副本,因此您最好将结构作为方法参数显式传递并完成它。你知道是否有办法防止这种行为吗?
@iruvar:通过 COW 复制它仍然比腌制它,通过管道发送,然后在另一边解开它更便宜。并且任何未被实际引用的东西(在父级中创建且未在工作器中加载的数据)都不会被复制。 “防止”这种行为的唯一方法是使用非 CPython 解释器(尽管它们的 GC 过程可能会触发类似的行为),或者使用非fork
start 方法(所以你必须通过酸洗发送东西,但至少你得到的可能被复制的东西要少得多)。
最简单的,也许过于简单化的底线:对任何你不想被复制和腌制的东西使用全局变量。
@dre-hh:macOS defaults to using the 'spawn'
method instead of 'fork'
starting in 3.8, because macOS system frameworks are not fork
-safe。 'spawn'
的工作方式与'fork'
的工作方式非常不同(它做了很多东西来模拟分叉,但根本不涉及 COW)。您可以随时尝试选择使用'fork'
start 方法(如果您在fork
时间不走运,可能会导致代码崩溃)。【参考方案2】:
或者,为了利用分叉的写时复制优势,同时保留一些封装的外观,您可以leverage class-attributes and @classmethods over pure globals
。
import time
from multiprocessing import Pool
import numpy as np
class MyClass(object):
myAttribute = np.zeros(100000000) # basically a big memory struct
# myAttribute is a class-attribute
@classmethod
def my_multithreaded_analysis(cls):
arg_list = [i for i in range(10)]
pool = Pool(processes=10)
result = pool.map(analyze, arg_list)
print result
@classmethod
def analyze(cls, i):
time.sleep(10)
# If you wanted, you could access cls.myAttribute w/o worry here.
return i ** 2
""" We don't need this proxy step !
def call_method(args):
my_instance, i = args
return my_instance.analyze(i)
"""
if __name__ == '__main__':
my_instance = MyClass()
# Note that now you can instantiate MyClass anywhere in your app,
# While still taking advantage of copy-on-write forking
my_instance.my_multithreaded_analysis()
注 1: 是的,我承认 class-attributes
和 class-methods
是美化的全局变量。但它需要一点封装......
注意 2: 您可以通过传递绑定实例方法 analyze(self)
,将 implicitly pass the instance (self) 传递给 Pool
创建的每个任务,而不是在上面显式创建 arg_lists
到Pool.map()
,然后更轻松地朝自己的脚开枪!
【讨论】:
以上是关于利用“写时复制”将数据复制到 Multiprocessing.Pool() 工作进程的主要内容,如果未能解决你的问题,请参考以下文章