为啥在更多 CPU/内核上的并行化在 Python 中的扩展性如此之差?
Posted
技术标签:
【中文标题】为啥在更多 CPU/内核上的并行化在 Python 中的扩展性如此之差?【英文标题】:Why does parallelization over more CPUs/cores scale so poorly with Python?为什么在更多 CPU/内核上的并行化在 Python 中的扩展性如此之差? 【发布时间】:2021-05-19 07:36:11 【问题描述】:背景
我正在尝试通过使用 Python 3.8 中更多可用内核的并行化(通过 joblib)来加速计算,但观察到它的扩展性确实很差。
试验
我编写了一个小脚本来测试和演示稍后可以找到的行为。该脚本(见下文)旨在让一个完全独立的任务使用 NumPy 和 Pandas 执行一些虚拟操作的迭代。任务没有输入也没有输出,没有磁盘或其他 I/O,也没有任何通信或共享内存,只是简单的 CPU 和 RAM 使用。除了偶尔请求当前时间之外,这些进程不使用任何其他资源。阿姆达尔定律不应该适用于这里的代码,因为除了进程设置之外根本没有通用代码。
我通过使用顺序处理与并行处理复制任务进行了一些增加工作负载的实验,并测量了每次迭代和整个(并行)流程完成所需的时间。我在我的 Windows 10 笔记本电脑和两台 AWS EC2 Linux (Amazon Linux 2) 机器上运行了该脚本。并行处理的数量从未超过可用内核的数量。
观察
我观察到以下情况(详情请参阅稍后的结果,持续时间以秒为单位):
在并行处理的数量少于可用内核数量的情况下,总平均 CPU 利用率 (user
) 从未超过 93%,system
调用不超过 4%,并且没有 @987654329 @(用iostat -hxm 10
测量)
不过,工作负载似乎平均分配在可用内核上,这可能表明进程之间频繁切换,即使有大量可用内核也是如此
有趣的是,对于顺序处理,CPU 利用率 (user
) 约为 48%
所有迭代的总持续时间仅略小于流程的总持续时间,因此流程设置似乎不是主要因素
并行进程数每增加一倍,每次迭代/进程的速度就会降低 50%
而顺序处理的持续时间约为。随着工作量加倍(迭代总数),按预期加倍,
并行处理的持续时间也显着增加了大约。每次翻倍 50%
如此大规模的发现出乎我的意料。
问题
这种行为的原因是什么?
我错过了什么吗?
如何补救才能充分利用使用更多内核的潜力?
详细结果
Windows 10
6 个 CPU,12 个内核
电话:python .\time_parallel_processing.py 1,2,4,8 10
Duration/Iter Duration total TotalIterCount
mean std mean mean
Mode ParallelCount
Joblib 1 4.363902 0.195268 43.673971 10
2 6.322100 0.140654 63.870973 20
4 9.270582 0.464706 93.631790 40
8 15.489000 0.222859 156.670544 80
Seq 1 4.409772 0.126686 44.133441 10
2 4.465326 0.113183 89.377296 20
4 4.534959 0.125097 181.528372 40
8 4.444790 0.083315 355.849860 80
AWS c5.4xlarge
8 个 CPU,16 个内核
电话:python time_parallel_processing.py 1,2,4,8,16 10
Duration/Iter Duration total TotalIterCount
mean std mean mean
Mode ParCount
Joblib 1 2.196086 0.009798 21.987626 10
2 3.392873 0.010025 34.297323 20
4 4.519174 0.126054 45.967140 40
8 6.888763 0.676024 71.815990 80
16 12.191278 0.156941 123.287779 160
Seq 1 2.192089 0.010873 21.945536 10
2 2.184294 0.008955 43.735713 20
4 2.201437 0.027537 88.156621 40
8 2.145312 0.009631 171.805374 80
16 2.137723 0.018985 342.393953 160
AWS c5.9xlarge
18 个 CPU,36 个内核
电话:python time_parallel_processing.py 1,2,4,8,16,32 10
Duration/Iter Duration total TotalIterCount
mean std mean mean
Mode ParCount
Joblib 1 1.888071 0.023799 18.905295 10
2 2.797132 0.009859 28.307708 20
4 3.349333 0.106755 34.199839 40
8 4.273267 0.705345 45.998927 80
16 6.383214 1.455857 70.469109 160
32 10.974141 4.220783 129.671016 320
Seq 1 1.891170 0.030131 18.934494 10
2 1.866365 0.007283 37.373133 20
4 1.893082 0.041085 75.813468 40
8 1.855832 0.007025 148.643725 80
16 1.896622 0.007573 303.828529 160
32 1.864366 0.009142 597.301383 320
脚本代码
import argparse
import sys
import time
from argparse import Namespace
from typing import List
import numpy as np
import pandas as pd
from joblib import delayed
from joblib import Parallel
from tqdm import tqdm
RESULT_COLUMNS = "Mode": str, "ParCount": int, "ProcessId": int, "IterId": int, "Duration": float
def _create_empty_data_frame() -> pd.DataFrame:
return pd.DataFrame(key: [] for key, _ in RESULT_COLUMNS.items()).astype(RESULT_COLUMNS)
def _do_task() -> None:
for _ in range(10):
array: np.ndarray = np.random.rand(2500, 2500)
_ = np.matmul(array, array)
data_frame: pd.DataFrame = pd.DataFrame(np.random.rand(250, 250), columns=list(map(str, list(range(250)))))
_ = data_frame.merge(data_frame)
def _process(process_id: int, iter_count: int) -> pd.DataFrame:
durations: pd.DataFrame = _create_empty_data_frame()
for i in tqdm(range(iter_count)):
iter_start_time: float = time.time()
_do_task()
durations = durations.append(
"Mode": "",
"ParCount": 0,
"ProcessId": process_id,
"IterId": i,
"Duration": time.time() - iter_start_time,
,
ignore_index=True,
)
return durations
def main(args: Namespace) -> None:
"""Execute main script."""
iter_durations: List[pd.DataFrame] = []
mode_durations: List[pd.DataFrame] = []
for par_count in list(map(int, args.par_counts.split(","))):
total_iter_count: int = par_count * int(args.iter_count)
print(f"\nRunning par_count processes in parallel and total_iter_count iterations in total")
start_time_joblib: float = time.time()
with Parallel(n_jobs=par_count) as parallel:
joblib_durations: List[pd.DataFrame] = parallel(
delayed(_process)(process_id, int(args.iter_count)) for process_id in range(par_count)
)
iter_durations.append(pd.concat(joblib_durations).assign(**"Mode": "Joblib", "ParCount": par_count))
end_time_joblib: float = time.time()
print(f"\nRunning par_count processes sequentially with total_iter_count iterations in total")
start_time_seq: float = time.time()
seq_durations: List[pd.DataFrame] = []
for process_id in range(par_count):
seq_durations.append(_process(process_id, int(args.iter_count)))
iter_durations.append(pd.concat(seq_durations).assign(**"Mode": "Seq", "ParCount": par_count))
end_time_seq: float = time.time()
mode_durations.append(
pd.DataFrame(
"Mode": ["Joblib", "Seq"],
"ParCount": [par_count] * 2,
"Duration": [end_time_joblib - start_time_joblib, end_time_seq - start_time_seq],
"TotalIterCount": [total_iter_count] * 2,
)
)
print("\nDuration in seconds")
grouping_columns: List[str] = ["Mode", "ParCount"]
print(
pd.concat(iter_durations)
.groupby(grouping_columns)
.agg("Duration": ["mean", "std"])
.merge(
pd.concat(mode_durations).groupby(grouping_columns).agg("Duration": ["mean"], "TotalIterCount": "mean"),
on=grouping_columns,
suffixes=["/Iter", " total"],
how="inner",
)
)
if __name__ == "__main__":
print(f"Command line: sys.argv")
parser: argparse.ArgumentParser = argparse.ArgumentParser(description=__doc__)
parser.add_argument(
"par_counts",
help="Comma separated list of parallel processes counts to start trials for (e.g. '1,2,4,8,16,32')",
)
parser.add_argument("iter_count", help="Number of iterations per parallel process to carry out")
args: argparse.Namespace = parser.parse_args()
start_time: float = time.time()
main(args)
print(f"\nTotal elapsed time: time.time() - start_time:.2f seconds")
环境
创建于'conda env create -f environment.time_parallel.yaml
environment.time_parallel.yaml
:
name: time_parallel
channels:
- defaults
- conda-forge
dependencies:
- python=3.8.5
- pip=20.3.3
- pandas=1.2.0
- numpy=1.19.2
- joblib=1.0.0
- tqdm=4.55.1
更新 1
感谢@sholderbach 的评论,我调查了 NumPy/Pandas 的用法并发现了一些事情。
1)
NumPy 使用线性代数后端,它会自动在并行线程中运行一些命令(包括矩阵乘法),这会导致过多的线程完全阻塞系统,并行进程越多,越多,因此每次迭代的持续时间也会增加。
我通过删除方法 _do_task
中的 NumPy 和 Pandas 操作并仅用简单的数学操作替换它来测试这个假设:
def _do_task() -> None:
for _ in range(10):
for i in range(10000000):
_ = 1000 ^ 2 % 200
结果完全符合预期,因为增加进程数(超出可用内核数)时,迭代的持续时间不会改变。
Windows 10
致电python time_parallel_processing.py 1,2,4,8 5
Duration in seconds
Duration/Iter Duration total TotalIterCount
mean std mean mean
Mode ParCount
Joblib 1 2.562570 0.015496 13.468393 5
2 2.556241 0.021074 13.781174 10
4 2.565614 0.054754 16.171828 20
8 2.630463 0.258474 20.328055 40
Seq 2 2.576542 0.033270 25.874965 10
AWS c5.9xlarge
致电python time_parallel_processing.py 1,2,4,8,16,32 10
Duration/Iter Duration total TotalIterCount
mean std mean mean
Mode ParCount
Joblib 1 2.082849 0.022352 20.854512 10
2 2.126195 0.034078 21.596860 20
4 2.287874 0.254493 27.420978 40
8 2.141553 0.030316 21.912917 80
16 2.156828 0.137937 24.483243 160
32 3.581366 1.197282 42.884399 320
Seq 2 2.076256 0.004231 41.571033 20
2)
按照@sholderbach 的提示,我发现了许多其他链接,这些链接涵盖了自动使用多线程的线性代数后端以及如何关闭它的主题:
NumPy issue(来自@sholderbach)threadpoolctl
package
Nice article
Pinning process to a specific CPU with Python (and package psutil
)
添加到_process
:
proc = psutil.Process()
proc.cpu_affinity([process_id])
with threadpool_limits(limits=1):
...
添加到环境中:
- threadpoolctl=2.1.0
- psutil=5.8.0
注意:我不得不将joblib
替换为multiprocessing
,因为固定在joblib
上无法正常工作(在Linux 上一次只生成了一半的进程)。
我做了一些测试,结果好坏参半。监控显示 pinnng 和每个进程限制为一个线程适用于 Windows 10 和 Linux/AWS c5.9xlarge。不幸的是,每次迭代的绝对持续时间会因这些“修复”而增加。 此外,每次迭代的持续时间仍会在某个并行化点开始增加。
结果如下:
Windows 10
电话:python time_parallel_processing.py 1,2,4,8 5
Duration/Iter Duration total TotalIterCount
mean std mean mean
Mode ParCount
Joblib 1 9.502184 0.046554 47.542230 5
2 9.557120 0.092897 49.488612 10
4 9.602235 0.078271 50.249238 20
8 10.518716 0.422020 60.186707 40
Seq 2 9.493682 0.062105 95.083382 10
AWS c5.9xlarge
致电python time_parallel_processing.py 1,2,4,8,16,20,24,28,32 5
Duration/Iter Duration total TotalIterCount
mean std mean mean
Mode ParCount
Parallel 1 5.271010 0.008730 15.862883 3
2 5.400430 0.016094 16.271649 6
4 5.708021 0.069001 17.428172 12
8 6.088623 0.179789 18.745922 24
16 8.330902 0.177772 25.566504 48
20 10.515132 3.081697 47.895538 60
24 13.506221 4.589382 53.348917 72
28 16.318631 4.961513 57.536180 84
32 19.800182 4.435462 64.717435 96
Seq 2 5.212529 0.037129 31.332297 6
【问题讨论】:
你为什么用joblib.Parallel
而不是multiprocessing.Pool
?
三个原因:1)我发现joblib
抽象更容易应用(而且它的酸洗机制更优越,不过这里不关心)2)它应该是一点点更快(参见例如here) 3)我尝试了multiprocessing
,但得到了奇怪的结果,即在 Windows 上运行时间过长,在 Linux 下完全停止(这是另一个故事)
是否使用 MKL 作为 BLAS 库?因为取决于 BLAS 实现,像 matmul 这样的操作可能会调用多核优化代码? github.com/pandas-dev/pandas/issues/23139
那是……一大堆代码和文本。你确定你已经排除了明显的错误,例如数据从/到进程的传输时间大于潜在的加速,或者通过并行访问锤击吞吐量有限的磁盘?
@MisterMiyagi:恕我直言,进程之间没有显着的数据传输,迭代速度也不会受到这种数据传输的影响,但是当增加使用的并行进程的数量时会显示增加。也没有磁盘 I/O。
【参考方案1】:
这种行为的原因是什么?
通常,这种类型的减速通常表示被 GIL 阻止、内核之间的上下文切换或进行大量酸洗
我错过了什么吗?
您可能遗漏了一些小问题 - 尝试分析(某些采样分析器可能比 cProfile 性能更高)以查看时间花在了哪里! 但是,在您重新实施以下建议之前,执行此操作的速度仍然有限
如何补救才能充分利用使用更多内核的潜力?
看看 numba 和 dask,它们可以让您通过 GIL 之外的并行化来极大地加速 numpy 和 pandas 代码
numba 编译 numpy 代码并将其缓存以提高速度和实用的处理器操作
dask 是一个框架,其中包含在单个和多个系统上进行高效并行化的好技巧
【讨论】:
"默认情况下,joblib.Parallel 使用 'loky' 后端模块来启动单独的 Python 工作进程以在不同的 CPU 上同时执行任务。" (请参阅here)每个进程都有自己的、独立的 Python 解释器和自己的、独立的 GIL,因此在这方面不应相互干扰。此外,由于流程之间没有大量数据传输(如果只是在最后),酸洗也不应该成为问题。以上是关于为啥在更多 CPU/内核上的并行化在 Python 中的扩展性如此之差?的主要内容,如果未能解决你的问题,请参考以下文章