利用“Copy-on-Write”将数据复制到Multiprocessing.Pool()工作进程
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了利用“Copy-on-Write”将数据复制到Multiprocessing.Pool()工作进程相关的知识,希望对你有一定的参考价值。
我有一些multiprocessing
Python代码看起来有点像这样:
import time
from multiprocessing import Pool
import numpy as np
class MyClass(object):
def __init__(self):
self.myAttribute = np.zeros(100000000) # basically a big memory struct
def my_multithreaded_analysis(self):
arg_lists = [(self, i) for i in range(10)]
pool = Pool(processes=10)
result = pool.map(call_method, arg_lists)
print result
def analyze(self, i):
time.sleep(10)
return i ** 2
def call_method(args):
my_instance, i = args
return my_instance.analyze(i)
if __name__ == '__main__':
my_instance = MyClass()
my_instance.my_multithreaded_analysis()
在阅读了关于内存如何在其他StackOverflow答案(例如Python multiprocessing memory usage)中工作的答案后,我认为这不会使用内存与我用于多处理的进程数量成比例,因为它是写时复制而我没有修改了my_instance
的任何属性。但是,当我运行顶部时,我确实看到了所有进程的高内存,它表示我的大多数进程都使用了大量内存(这是OSX的最高输出,但我可以在Linux上复制)。
我的问题基本上是,我是否正确地解释了这一点,因为我的MyClass
实例实际上是在池中重复的?如果是这样,我该如何防止这种情况;我应该不使用这样的结构吗?我的目标是减少计算分析的内存使用量。
PID COMMAND %CPU TIME #TH #WQ #PORT MEM PURG CMPRS PGRP PPID STATE
2494 Python 0.0 00:01.75 1 0 7 765M 0B 0B 2484 2484 sleeping
2493 Python 0.0 00:01.85 1 0 7 765M 0B 0B 2484 2484 sleeping
2492 Python 0.0 00:01.86 1 0 7 765M 0B 0B 2484 2484 sleeping
2491 Python 0.0 00:01.83 1 0 7 765M 0B 0B 2484 2484 sleeping
2490 Python 0.0 00:01.87 1 0 7 765M 0B 0B 2484 2484 sleeping
2489 Python 0.0 00:01.79 1 0 7 167M 0B 597M 2484 2484 sleeping
2488 Python 0.0 00:01.77 1 0 7 10M 0B 755M 2484 2484 sleeping
2487 Python 0.0 00:01.75 1 0 7 8724K 0B 756M 2484 2484 sleeping
2486 Python 0.0 00:01.78 1 0 7 9968K 0B 755M 2484 2484 sleeping
2485 Python 0.0 00:01.74 1 0 7 171M 0B 594M 2484 2484 sleeping
2484 Python 0.1 00:16.43 4 0 18 775M 0B 12K 2484 2235 sleeping
发送到pool.map
(和相关方法)的任何内容实际上并不使用共享的写时复制资源。值为"pickled" (Python's serialization mechanism),通过管道发送到工作进程并在那里进行unpickled,从头开始重建子对象。因此,在这种情况下,每个孩子最终都会得到原始数据的写时复制版本(它从未使用过,因为它被告知要使用通过IPC发送的副本),以及原始数据的个人重新创建。在孩子身上重建,不分享。
如果要利用分叉的写时复制优势,则无法通过管道发送数据(或引用数据的对象)。您必须将它们存储在可以通过访问自己的全局变量从子项中找到的位置。例如:
import time
from multiprocessing import Pool
import numpy as np
class MyClass(object):
def __init__(self):
self.myAttribute = np.zeros(100000000) # basically a big memory struct
def my_multithreaded_analysis(self):
arg_lists = list(range(10)) # Don't pass self
pool = Pool(processes=10)
result = pool.map(call_method, arg_lists)
print result
def analyze(self, i):
time.sleep(10)
return i ** 2
def call_method(i):
# Implicitly use global copy of my_instance, not one passed as an argument
return my_instance.analyze(i)
# Constructed globally and unconditionally, so the instance exists
# prior to forking in commonly accessible location
my_instance = MyClass()
if __name__ == '__main__':
my_instance.my_multithreaded_analysis()
通过不传递self
,您可以避免复制,只需使用写入时复制的单个全局对象映射到子级。如果您需要多个对象,可以在创建池之前将全局list
或dict
映射到对象的实例,然后将可以查找对象的索引或键作为参数的一部分传递给pool.map
。然后,worker函数使用索引/键(必须通过IPC腌制并发送给子节点)来查找全局字典中的值(写入时复制映射)(也是写入时复制映射),所以你复制便宜的信息来查找孩子的昂贵数据而不复制它。
或者,为了利用分叉的写时复制优势,同时保留一些相似的封装,你可以使用leverage class-attributes and @classmethods over pure globals
。
import time
from multiprocessing import Pool
import numpy as np
class MyClass(object):
myAttribute = np.zeros(100000000) # basically a big memory struct
# myAttribute is a class-attribute
@classmethod
def my_multithreaded_analysis(cls):
arg_list = [i for i in range(10)]
pool = Pool(processes=10)
result = pool.map(analyze, arg_list)
print result
@classmethod
def analyze(cls, i):
time.sleep(10)
# If you wanted, you could access cls.myAttribute w/o worry here.
return i ** 2
""" We don't need this proxy step !
def call_method(args):
my_instance, i = args
return my_instance.analyze(i)
"""
if __name__ == '__main__':
my_instance = MyClass()
# Note that now you can instantiate MyClass anywhere in your app,
# While still taking advantage of copy-on-write forking
my_instance.my_multithreaded_analysis()
注1:是的,我承认class-attributes
和class-methods
是美化全球化的。但它购买了一些封装......
注意2:你可以通过将绑定实例方法arg_lists
传递给implicitly pass the instance (self),而不是明确地创建上面的Pool
,你可以analyze(self)
到Pool.map()
创建的每个任务,并且更容易射击自己的脚!
以上是关于利用“Copy-on-Write”将数据复制到Multiprocessing.Pool()工作进程的主要内容,如果未能解决你的问题,请参考以下文章