如何将多处理用于函数而不是循环?
Posted
技术标签:
【中文标题】如何将多处理用于函数而不是循环?【英文标题】:How to use multiprocessing for a function instead of loop? 【发布时间】:2021-10-09 21:17:46 【问题描述】:我写了一个包含大约 400 行的函数。该函数在数据帧上执行某种数据科学。当我运行该功能时,大约需要 10 秒。我需要在每次迭代中使用不同的参数运行这个函数 100 次。因此在循环中我调用该函数 100 次,并且每次迭代我输入 4 个不同的参数。总共花了大约15分钟。因此我想使用CPU Parallelization
。如何在 python 中使用多处理来提供并行化并改善运行时?
代码示例:
result = []
for i range(100):
result.append(searching_algorithm(a[i], b[i], c[i], d[i]))
【问题讨论】:
那么你的问题是什么?您是否尝试过搜索某些内容。也许multiprocessing
和concurrent.futures
模块的官方文档可能是一个很好的起点。
@Olvin Roght,感谢您的评论,我看到了这些文档,但无法正确地进行多处理。我不知道我应该如何准确地调用多处理函数以及我应该以哪种方式放置参数
你看到但没有读到。我附加到之前评论的每个链接都包含示例部分(1、2)。
【参考方案1】:
您没有说a
、b
、c
和d
是什么类型的列表。这些列表中的元素必须能够使用pickle
模块进行序列化,因为它们需要传递给将由在不同地址空间中运行的进程执行的函数。为了便于论证,我们假设它们是长度至少为 100 的整数列表。
您也没有说明您在哪个平台下运行(Windows?MacOS?Linux?)。当您使用multiprocessing
标记问题时,您还应该使用平台标记问题。您如何组织代码在一定程度上取决于平台。在下面的代码中,我为那些使用spawn
创建新进程的平台(即Windows)选择了最有效的安排。但这在 MacOS 和 Linux 上也很有效,默认情况下使用 fork
创建新进程。您可以研究 spawn
和 fork
在创建新流程时的含义。最终要提高内存和 CPU 效率,您只希望作为 if __name__ == '__main__':
之外的全局变量来阻止那些必须是全局的变量。这就是为什么我要声明一个函数的本地列表。
然后使用我们拥有的concurrent.futures
模块:
from concurrent.futures import ProcessPoolExecutor
def searching_algorithm(a, b, c, d):
...
return a * b * c * d
def main():
# We assume a, b, c and d each have 100 or more elements:
a = list(range(1, 101))
b = list(range(2, 102))
c = list(range(3, 103))
d = list(range(4, 104))
# Use all CPU cores:
with ProcessPoolExecutor() as executor:
result = list(executor.map(searching_algorithm, a[0:100], b[0:100], c[0:100], d[0:100]))
print(result[0], result[-1])
# Required for Windows:
if __name__ == '__main__':
main()
打印:
24 106110600
要改用multiprocessing
模块:
from multiprocessing import Pool
def searching_algorithm(a, b, c, d):
...
return a * b * c * d
def main():
# We assume a, b, c and d each have 100 or more elements:
a = list(range(1, 101))
b = list(range(2, 102))
c = list(range(3, 103))
d = list(range(4, 104))
# Use all CPU cores:
with Pool() as pool:
result = pool.starmap(searching_algorithm, zip(a[0:100], b[0:100], c[0:100], d[0:100]))
print(result[0], result[-1])
# Required for Windows:
if __name__ == '__main__':
main()
在这两个编码示例中,如果列表a
、b
、c
和d
正好包含 100 个元素,则不需要对它们进行切片,例如 a[0:100]
;只需传递列表本身,例如:
result = list(executor.map(searching_algorithm, a, b, c, d))
【讨论】:
我使用的是 Windows 操作系统 我在您的问题中添加了windows
标签(您可以编辑您的问题并自己完成)。多处理是一个大而复杂的话题。但是,如果您查看上面的代码,然后在 Python 文档中查找这些类和方法,希望它会有意义。所以在你“消化”了这个之后,让我知道这是否已经令人满意地回答了你的问题。
非常感谢您的回答,这对我来说非常有用,我按照您在此处编写的代码编写了代码,它有效,但花费的时间比我预期的要多。当我在 CPU 上运行一个函数时大约需要 6 秒,我预计当我对 100 个不同的函数调用应用多处理时,最多需要大约 10 秒。但就我而言,它花了 80 秒。这可能是什么原因?
您需要发布更多代码(searching_algorithm
在做什么以及它使用什么包?)。另外,你有多少个核心?但是如果你在没有多处理的情况下运行它,这将需要 100 * 6 = 600 秒。这是一个相当大的性能改进。逻辑内核和物理内核之间也有区别。我的桌面有 8 个逻辑内核,由 multiprocessing.cpu_count()
或 os.cpu_count()
提供,这将是我的默认池大小。但实际上只有 4 个物理核心。 (更多...)
见Multiprocessing: use only the physical cores?。因此,例如,您可能有 14 个逻辑内核,但时间不会减少 14 倍。但是,我不同意您不应该使用所有逻辑处理器,特别是如果有一些I/O 由您的函数完成。以上是关于如何将多处理用于函数而不是循环?的主要内容,如果未能解决你的问题,请参考以下文章
将多线程和多处理与 concurrent.futures 相结合