具有大对象的 Python 多处理:防止对象的复制/序列化
Posted
技术标签:
【中文标题】具有大对象的 Python 多处理:防止对象的复制/序列化【英文标题】:Python multiprocessing with large objects: prevent copying/serialization of object 【发布时间】:2020-05-01 00:09:28 【问题描述】:我已经针对较大对象的一些问题实现了多处理,如下所示:
import time
import pathos.multiprocessing as mp
from functools import partial
from random import randrange
class RandomNumber():
def __init__(self, object_size=100):
self.size = bytearray(object_size*10**6) # 100 MB size
self.foo = None
def do_something(self, *args, **kwargs):
self.foo = randrange(1, 10)
time.sleep(0.5) # wait for 0.5 seconds
return self
def wrapper(random_number, *args, **kwargs):
return random_number.do_something(*args, **kwargs)
if __name__ == '__main__':
# create data
numbers = [RandomNumber() for m in range(0, 9)]
kwds = 'add': randrange(1, 10)
# calculate
pool = mp.Pool(processes=mp.cpu_count())
result = pool.map_async(partial(wrapper, **kwds), numbers)
try:
result = result.get()
except:
pass
# print result
my_results = [i.foo for i in result]
print(my_results)
pool.close()
pool.join()
产生类似的东西:
[8, 7, 8, 3, 1, 2, 6, 4, 8]
现在的问题是,当对象非常小时,与使用列表推导相比,我在性能上有了巨大的改进,而这种改进在更大的对象大小时变成了相反的情况,例如100 MB 或更大。
从documentation 和其他问题中,我发现这是由于使用 pickle/dill 对单个对象进行序列化以便将它们传递给池中的工作人员造成的。换句话说:对象被复制,这个 IO 操作成为瓶颈,因为它比实际计算更耗时。
我已经尝试使用multiprocessing.Manager 处理同一个对象,但这会导致运行时间更长。
问题是我被绑定到一个特定的类结构(这里通过RandomNumber()
表示),我无法改变它..
现在我的问题是:是否有任何方法或概念来规避这种行为,并且只在do_something()
上调用我,而无需序列化或复制的开销?
欢迎任何提示。提前致谢!
【问题讨论】:
这是否涉及 IO 或 CPU 密集型任务 这是一个占用大量 CPU 资源的任务。我将数学求解器应用于特定问题,而数据负责对象的内存大小。 【参考方案1】:您需要使用Batch processing
。不要为每个号码创建销毁工人。
根据cpu_count
制作有限的工人。然后将一个列表传递给每个工作并处理它们。使用map
并传递一个包含batches
数字的列表。
【讨论】:
您能否提供一个最小示例或有关如何实现此功能的链接? @CordKaldemeyer 我不在电脑前....将尝试给出一个想法....从您的主列表创建子列表....现在将每个子列表传递给 map....in您的函数在 Len 列出并执行某些操作时执行...一旦您运行此命令...您将获得批处理或子列表的最佳大小.... 当开始运行这个......玩batchsize......你最终会找到这个任务的正确大小【参考方案2】:我从concurrent.futures 库中找到了一个使用多处理或多线程的解决方案,它不需要腌制对象。在我的例子中,使用ThreadPoolExecutor
的多线程比使用ProcessPoolExecutor
的多线程带来了明显的优势。
import time
from random import randrange
import concurrent.futures as cf
class RandomNumber():
def __init__(self, object_size=100):
self.size = bytearray(object_size*10**6) # 100 MB size
self.foo = None
def do_something(self, *args, **kwargs):
self.foo = randrange(1, 10)
time.sleep(0.5) # wait for 0.5 seconds
return self
def wrapper(random_number, *args, **kwargs):
return random_number.do_something(*args, **kwargs)
if __name__ == '__main__':
# create data
numbers = [RandomNumber() for m in range(0, 100)]
kwds = 'add': randrange(1, 10)
# run
with cf.ThreadPoolExecutor(max_workers=3) as executor:
result = executor.map(wrapper, numbers, timeout=5*60)
# print result
my_results = [i.foo for i in result]
print(my_results)
产量:
[3, 3, 1, 1, 3, 7, 7, 6, 7, 5, 9, 5, 6, 5, 6, 9, 1, 5, 1, 7, 5, 3, 6, 2, 9, 2, 1, 2, 5, 1, 7, 9, 2, 9, 4, 9, 8, 5, 2, 1, 7, 8, 5, 1, 4, 5, 8, 2, 2, 5, 3, 6, 3, 2, 5, 3, 1, 9, 6, 7, 2, 4, 1, 5, 4, 4, 4, 9, 3, 1, 5, 6, 6, 8, 4, 4, 8, 7, 5, 9, 7, 8, 6, 2, 3, 1, 7, 2, 4, 8, 3, 6, 4, 1, 7, 7, 3, 4, 1, 2]
real 0m21.100s
user 0m1.100s
sys 0m2.896s
尽管如此,如果我有太多对象(此处为numbers
),这仍然会导致内存泄漏,并且如果必须交换内存(即系统冻结直到任务完成。
关于如何防止这种情况的任何提示?
【讨论】:
以上是关于具有大对象的 Python 多处理:防止对象的复制/序列化的主要内容,如果未能解决你的问题,请参考以下文章
python中的多处理-在多个进程之间共享大对象(例如pandas数据框)