具有多处理功能的 Python itertools - 巨大的列表与使用迭代器的 CPU 使用效率低下
Posted
技术标签:
【中文标题】具有多处理功能的 Python itertools - 巨大的列表与使用迭代器的 CPU 使用效率低下【英文标题】:Python itertools with multiprocessing - huge list vs inefficient CPUs usage with iterator 【发布时间】:2016-08-08 22:00:29 【问题描述】:我处理 n 个元素(下面称为“对”)变体,并将重复用作我的函数的参数。显然,只要“r”列表不足以消耗所有内存,一切都会正常工作。问题是我最终必须为 6 个元素重复 16 次以上。为此,我在云中使用 40 核系统。
代码如下所示:
if __name__ == '__main__':
pool = Pool(39)
r = itertools.product(pairs,repeat=16)
pool.map(f, r)
我相信我应该使用迭代器而不是预先创建巨大的列表,问题就从这里开始了..
我尝试使用以下代码解决问题:
if __name__ == '__main__':
pool = Pool(39)
for r in itertools.product(pairs,repeat=14):
pool.map(f, r)
内存问题消失了,但 CPU 使用率约为每个内核 5%。现在单核版本的代码比这个快。
如果你能指导我一下,我真的很感激..
谢谢。
【问题讨论】:
旁注:如果您使用现代 Python(Python 3.3 或更高版本),最好将Pool
与 with
语句一起使用,这样Pool
工作人员就可以按预期清理.只需将pool = Pool(39)
更改为with Pool(39) as pool:
并缩进它下面使用池的行;当块退出时,工人立即被清理。
【参考方案1】:
您的原始代码没有在您自己的代码中预先创建list
(itertools.product
返回一个生成器),但pool.map
正在实现整个生成器(因为它假设您是否可以存储所有输出,您可以也存储所有输入)。
不要在这里使用pool.map
。如果您需要排序结果,请使用pool.imap
,或者如果结果顺序不重要,请使用pool.imap_unordered
。迭代任一调用的结果(不要包含在 list
中),并在结果出现时对其进行处理,内存应该不是问题:
if __name__ == '__main__':
pool = Pool(39)
for result in pool.imap(f, itertools.product(pairs, repeat=16)):
print(result)
如果您使用pool.map
来处理副作用,那么您只需运行它直到完成,但结果和排序并不重要,您可以通过使用imap_unordered
和使用collections.deque
显着提高性能在不实际存储任何内容的情况下有效地排出“结果”(deque
和 maxlen
的 0
是强制迭代器运行完成而不存储结果的最快、最低内存方式):
from collections import deque
if __name__ == '__main__':
pool = Pool(39)
deque(pool.imap_unordered(f, itertools.product(pairs, repeat=16)), 0)
最后,我有点怀疑指定 39 Pool
工人; multiprocessing
对 CPU 密集型任务非常有利;如果您使用的工作线程数超过了 CPU 内核数并获得了好处,那么multiprocessing
可能会在 IPC 上花费比获得的更多,而使用更多工作线程只是通过缓冲更多数据来掩盖问题。
如果您的工作主要受 I/O 限制,您可以尝试使用基于线程的池,这将避免酸洗和解酸的开销,以及父进程和子进程之间的 IPC 成本。与基于进程的池不同,Python 线程受GIL 问题的影响,因此您的CPU 绑定在Python 中工作(不包括GIL 释放I/O 调用、ctypes
对.dll/.so 文件的调用以及某些第三方扩展,如numpy
发布 GIL 用于繁重的 CPU 工作)仅限于单核(在 Python 2.x 中用于 CPU 绑定的工作,你经常浪费大量的解决 GIL 争用和执行上下文切换;Python 3 删除了大部分垃圾)。但是,如果您的工作主要受 I/O 限制,则 I/O 阻塞会释放 GIL 以允许其他线程运行,因此您可以拥有许多线程,只要它们中的大多数延迟 I/O。切换也很容易(只要您没有将程序设计为依赖于每个工作人员的单独地址空间,假设您可以写入“共享”状态并且不影响其他工作人员或父进程),只需更改:
from multiprocessing import Pool
到:
from multiprocessing.dummy import Pool
你会得到multiprocessing.dummy
版本的池,它基于线程而不是进程。
【讨论】:
感谢您的澄清。我已经尝试了这两个选项,对于这两个选项,第一个进程显示为 150% 的 CPU 利用率(在顶部),其余进程只有 40% 忙,一旦进程数量增加,它就会急剧下降(最多 17 39 个进程的百分比 - 对于 40 个 vcpus)。如何提高效率? @xis_one:可能有帮助的一件事是将 >1chunksize
传递给 imap
/imap_unordered
,这样在工人必须再次阻止 IPC 之前,他们需要做更多的工作。更复杂但通常更好的选择是让工人自己产生一些工作,例如如果pairs
是一个全局变量,您可以imap
为product(pairs, repeat=10)
工作,然后让每个工作人员生成所有最后6 个可能的项目,例如for workitem in map(workerarg.__add__, product(pairs, repeat=6)):
,从而减少了执行单个任务必须传输的数据量。
注意:我上一条评论中的map
将是普通的内置map
,而不是池映射。如果您使用的是 Python 2,则需要执行 from future_builtins import map
以获取基于 map
的 Py3 生成器,以避免出现巨大的 list
问题。
附加说明:如果您将一些工作生成推送到子进程,它们将返回值的集合而不是单个值。在这种情况下,为了使它仍然表现得像一次只获得一个值,您可能需要考虑将imap*
调用包装在itertools.chain.from_iterable
中,以便它从list
/@ 的迭代器转换而来987654361@s 到基础值的迭代器。【参考方案2】:
第二个代码示例速度较慢,因为您将单对提交到包含 39 个作品的池中。只有一名工作人员会处理您的请求,而其他 38 名工作人员将无所事事!会更慢,因为您将数据从主线程传输到工作进程会有开销。
您可以“缓冲”一些对,然后执行这组对以平衡内存使用,但仍然可以利用多进程环境。
import itertools
from multiprocessing import Pool
def foo(x):
return sum(x)
cpus = 3
pool = Pool(cpus)
# 10 is buffer size multiplier - the number of pair that each process will get
buff_size = 10*cpus
buff = []
for i, r in enumerate(itertools.product(range(20), range(10))):
if (i % buff_size) == (buff_size-1):
print pool.map(foo, buff)
buff = []
else:
buff.append(r)
if len(buff) > 0:
print pool.map(foo, buff)
buff = []
上面的输出会是这样的
[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 2, 3, 4, 5, 6, 7, 8, 9, 10]
[3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 5, 6, 7, 8, 9, 10, 11, 12, 13]
[6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 8, 9, 10, 11, 12, 13, 14, 15, 16]
[9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 11, 12, 13, 14, 15, 16, 17, 18, 19]
[12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 14, 15, 16, 17, 18, 19, 20, 21, 22]
[15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 17, 18, 19, 20, 21, 22, 23, 24, 25]
[18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28]
使用buff_size
乘数来获得适合您系统的平衡!
【讨论】:
以上是关于具有多处理功能的 Python itertools - 巨大的列表与使用迭代器的 CPU 使用效率低下的主要内容,如果未能解决你的问题,请参考以下文章
《笔记》python itertools的groupby分组数据处理
Python itertools.product 具有任意数量的集合