具有多处理功能的 Python itertools - 巨大的列表与使用迭代器的 CPU 使用效率低下

Posted

技术标签:

【中文标题】具有多处理功能的 Python itertools - 巨大的列表与使用迭代器的 CPU 使用效率低下【英文标题】:Python itertools with multiprocessing - huge list vs inefficient CPUs usage with iterator 【发布时间】:2016-08-08 22:00:29 【问题描述】:

我处理 n 个元素(下面称为“对”)变体,并将重复用作我的函数的参数。显然,只要“r”列表不足以消耗所有内存,一切都会正常工作。问题是我最终必须为 6 个元素重复 16 次以上。为此,我在云中使用 40 核系统。

代码如下所示:

if __name__ == '__main__':
  pool = Pool(39)
  r = itertools.product(pairs,repeat=16)
  pool.map(f, r)

我相信我应该使用迭代器而不是预先创建巨大的列表,问题就从这里开始了..

我尝试使用以下代码解决问题:

if __name__ == '__main__':
  pool = Pool(39)
  for r in itertools.product(pairs,repeat=14):
    pool.map(f, r)

内存问题消失了,但 CPU 使用率约为每个内核 5%。现在单核版本的代码比这个快。

如果你能指导我一下,我真的很感激..

谢谢。

【问题讨论】:

旁注:如果您使用现代 Python(Python 3.3 或更高版本),最好将 Poolwith 语句一起使用,这样Pool 工作人员就可以按预期清理.只需将pool = Pool(39) 更改为with Pool(39) as pool: 并缩进它下面使用池的行;当块退出时,工人立即被清理。 【参考方案1】:

您的原始代码没有在您自己的代码中预先创建listitertools.product 返回一个生成器),但pool.map 正在实现整个生成器(因为它假设您是否可以存储所有输出,您可以也存储所有输入)。

不要在这里使用pool.map。如果您需要排序结果,请使用pool.imap,或者如果结果顺序不重要,请使用pool.imap_unordered。迭代任一调用的结果(不要包含在 list 中),并在结果出现时对其进行处理,内存应该不是问题:

if __name__ == '__main__':
    pool = Pool(39)
    for result in pool.imap(f, itertools.product(pairs, repeat=16)):
        print(result)

如果您使用pool.map 来处理副作用,那么您只需运行它直到完成,但结果和排序并不重要,您可以通过使用imap_unordered 和使用collections.deque 显着提高性能在不实际存储任何内容的情况下有效地排出“结果”(dequemaxlen0 是强制迭代器运行完成而不存储结果的最快、最低内存方式):

from collections import deque

if __name__ == '__main__':
    pool = Pool(39)
    deque(pool.imap_unordered(f, itertools.product(pairs, repeat=16)), 0)

最后,我有点怀疑指定 39 Pool 工人; multiprocessing 对 CPU 密集型任务非常有利;如果您使用的工作线程数超过了 CPU 内核数并获得了好处,那么multiprocessing 可能会在 IPC 上花费比获得的更多,而使用更多工作线程只是通过缓冲更多数据来掩盖问题。

如果您的工作主要受 I/O 限制,您可以尝试使用基于线程的池,这将避免酸洗和解酸的开销,以及父进程和子进程之间的 IPC 成本。与基于进程的池不同,Python 线程受GIL 问题的影响,因此您的CPU 绑定在Python 中工作(不包括GIL 释放I/O 调用、ctypes 对.dll/.so 文件的调用以及某些第三方扩展,如numpy 发布 GIL 用于繁重的 CPU 工作)仅限于单核(在 Python 2.x 中用于 CPU 绑定的工作,你经常浪费大量的解决 GIL 争用和执行上下文切换;Python 3 删除了大部分垃圾)。但是,如果您的工作主要受 I/O 限制,则 I/O 阻塞会释放 GIL 以允许其他线程运行,因此您可以拥有许多线程,只要它们中的大多数延迟 I/O。切换也很容易(只要您没有将程序设计为依赖于每个工作人员的单独地址空间,假设您可以写入“共享”状态并且不影响其他工作人员或父进程),只需更改:

from multiprocessing import Pool

到:

from multiprocessing.dummy import Pool

你会得到multiprocessing.dummy 版本的池,它基于线程而不是进程。

【讨论】:

感谢您的澄清。我已经尝试了这两个选项,对于这两个选项,第一个进程显示为 150% 的 CPU 利用率(在顶部),其余进程只有 40% 忙,一旦进程数量增加,它就会急剧下降(最多 17 39 个进程的百分比 - 对于 40 个 vcpus)。如何提高效率? @xis_one:可能有帮助的一件事是将 >1 chunksize 传递给 imap/imap_unordered,这样在工人必须再次阻止 IPC 之前,他们需要做更多的工作。更复杂但通常更好的选择是让工人自己产生一些工作,例如如果pairs 是一个全局变量,您可以imapproduct(pairs, repeat=10) 工作,然后让每个工作人员生成所有最后6 个可能的项目,例如for workitem in map(workerarg.__add__, product(pairs, repeat=6)):,从而减少了执行单个任务必须传输的数据量。 注意:我上一条评论中的map 将是普通的内置map,而不是池映射。如果您使用的是 Python 2,则需要执行 from future_builtins import map 以获取基于 map 的 Py3 生成器,以避免出现巨大的 list 问题。 附加说明:如果您将一些工作生成推送到子进程,它们将返回值的集合而不是单个值。在这种情况下,为了使它仍然表现得像一次只获得一个值,您可能需要考虑将imap* 调用包装在itertools.chain.from_iterable 中,以便它从list/@ 的迭代器转换而来987654361@s 到基础值的迭代器。【参考方案2】:

第二个代码示例速度较慢,因为您将单对提交到包含 39 个作品的池中。只有一名工作人员会处理您的请求,而其他 38 名工作人员将无所事事!会更慢,因为您将数据从主线程传输到工作进程会有开销。

您可以“缓冲”一些对,然后执行这组对以平衡内存使用,但仍然可以利用多进程环境。

import itertools
from multiprocessing import Pool

def foo(x):
    return sum(x)

cpus = 3
pool = Pool(cpus)
# 10 is buffer size multiplier - the number of pair that each process will get
buff_size = 10*cpus  
buff = []
for i, r in enumerate(itertools.product(range(20), range(10))):
    if (i % buff_size) == (buff_size-1):
        print pool.map(foo, buff)
        buff = []
    else:
        buff.append(r)

if len(buff) > 0:
    print pool.map(foo, buff)
    buff = []

上面的输出会是这样的

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 2, 3, 4, 5, 6, 7, 8, 9, 10]
[3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 5, 6, 7, 8, 9, 10, 11, 12, 13]
[6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 8, 9, 10, 11, 12, 13, 14, 15, 16]
[9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 11, 12, 13, 14, 15, 16, 17, 18, 19]
[12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 14, 15, 16, 17, 18, 19, 20, 21, 22]
[15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 17, 18, 19, 20, 21, 22, 23, 24, 25]
[18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28]

使用buff_size 乘数来获得适合您系统的平衡!

【讨论】:

以上是关于具有多处理功能的 Python itertools - 巨大的列表与使用迭代器的 CPU 使用效率低下的主要内容,如果未能解决你的问题,请参考以下文章

《笔记》python itertools的groupby分组数据处理

Python itertools.product 具有任意数量的集合

python itertools 具有绑定值的排列

Python itertools.product 具有可变数量的参数

具有多处理功能的 Python socketio

如何“多处理” itertools 产品模块?