为啥在更多 CPU/内核上的并行化在 Python 中的扩展性如此之差?

Posted

技术标签:

【中文标题】为啥在更多 CPU/内核上的并行化在 Python 中的扩展性如此之差?【英文标题】:Why does parallelization over more CPUs/cores scale so poorly with Python?为什么在更多 CPU/内核上的并行化在 Python 中的扩展性如此之差? 【发布时间】:2021-05-19 07:36:11 【问题描述】:

背景

我正在尝试通过使用 Python 3.8 中更多可用内核的并行化(通过 joblib)来加速计算,但观察到它的扩展性确实很差。

试验

我编写了一个小脚本来测试和演示稍后可以找到的行为。该脚本(见下文)旨在让一个完全独立的任务使用 NumPy 和 Pandas 执行一些虚拟操作的迭代。任务没有输入也没有输出,没有磁盘或其他 I/O,也没有任何通信或共享内存,只是简单的 CPU 和 RAM 使用。除了偶尔请求当前时间之外,这些进程不使用任何其他资源。阿姆达尔定律不应该适用于这里的代码,因为除了进程设置之外根本没有通用代码。

我通过使用顺序处理与并行处理复制任务进行了一些增加工作负载的实验,并测量了每次迭代和整个(并行)流程完成所需的时间。我在我的 Windows 10 笔记本电脑和两台 AWS EC2 Linux (Amazon Linux 2) 机器上运行了该脚本。并行处理的数量从未超过可用内核的数量。

观察

我观察到以下情况(详情请参阅稍后的结果,持续时间以秒为单位):

在并行处理的数量少于可用内核数量的情况下,总平均 CPU 利用率 (user) 从未超过 93%,system 调用不超过 4%,并且没有 @987654329 @(用iostat -hxm 10测量) 不过,工作负载似乎平均分配在可用内核上,这可能表明进程之间频繁切换,即使有大量可用内核也是如此 有趣的是,对于顺序处理,CPU 利用率 (user) 约为 48% 所有迭代的总持续时间仅略小于流程的总持续时间,因此流程设置似乎不是主要因素 并行进程数每增加一倍,每次迭代/进程的速度就会降低 50% 而顺序处理的持续时间约为。随着工作量加倍(迭代总数),按预期加倍, 并行处理的持续时间也显着增加了大约。每次翻倍 50%

如此大规模的发现出乎我的意料。

问题

这种行为的原因是什么?

我错过了什么吗?

如何补救才能充分利用使用更多内核的潜力?

详细结果

Windows 10

6 个 CPU,12 个内核 电话:python .\time_parallel_processing.py 1,2,4,8 10

                     Duration/Iter            Duration total TotalIterCount
                              mean        std           mean           mean
Mode   ParallelCount
Joblib 1                  4.363902   0.195268      43.673971             10
       2                  6.322100   0.140654      63.870973             20
       4                  9.270582   0.464706      93.631790             40
       8                 15.489000   0.222859     156.670544             80
Seq    1                  4.409772   0.126686      44.133441             10
       2                  4.465326   0.113183      89.377296             20
       4                  4.534959   0.125097     181.528372             40
       8                  4.444790   0.083315     355.849860             80

AWS c5.4xlarge

8 个 CPU,16 个内核 电话:python time_parallel_processing.py 1,2,4,8,16 10

                Duration/Iter           Duration total TotalIterCount
                         mean       std           mean           mean
Mode   ParCount
Joblib 1             2.196086  0.009798      21.987626             10
       2             3.392873  0.010025      34.297323             20
       4             4.519174  0.126054      45.967140             40
       8             6.888763  0.676024      71.815990             80
       16           12.191278  0.156941     123.287779            160
Seq    1             2.192089  0.010873      21.945536             10
       2             2.184294  0.008955      43.735713             20
       4             2.201437  0.027537      88.156621             40
       8             2.145312  0.009631     171.805374             80
       16            2.137723  0.018985     342.393953            160

AWS c5.9xlarge

18 个 CPU,36 个内核 电话:python time_parallel_processing.py 1,2,4,8,16,32 10

                Duration/Iter           Duration total TotalIterCount
                         mean       std           mean           mean
Mode   ParCount
Joblib 1             1.888071  0.023799      18.905295             10
       2             2.797132  0.009859      28.307708             20
       4             3.349333  0.106755      34.199839             40
       8             4.273267  0.705345      45.998927             80
       16            6.383214  1.455857      70.469109            160
       32           10.974141  4.220783     129.671016            320
Seq    1             1.891170  0.030131      18.934494             10
       2             1.866365  0.007283      37.373133             20
       4             1.893082  0.041085      75.813468             40
       8             1.855832  0.007025     148.643725             80
       16            1.896622  0.007573     303.828529            160
       32            1.864366  0.009142     597.301383            320

脚本代码

import argparse
import sys
import time
from argparse import Namespace
from typing import List
import numpy as np
import pandas as pd
from joblib import delayed
from joblib import Parallel
from tqdm import tqdm

RESULT_COLUMNS = "Mode": str, "ParCount": int, "ProcessId": int, "IterId": int, "Duration": float

def _create_empty_data_frame() -> pd.DataFrame:
    return pd.DataFrame(key: [] for key, _ in RESULT_COLUMNS.items()).astype(RESULT_COLUMNS)

def _do_task() -> None:
    for _ in range(10):
        array: np.ndarray = np.random.rand(2500, 2500)
        _ = np.matmul(array, array)
        data_frame: pd.DataFrame = pd.DataFrame(np.random.rand(250, 250), columns=list(map(str, list(range(250)))))
        _ = data_frame.merge(data_frame)

def _process(process_id: int, iter_count: int) -> pd.DataFrame:
    durations: pd.DataFrame = _create_empty_data_frame()
    for i in tqdm(range(iter_count)):
        iter_start_time: float = time.time()
        _do_task()
        durations = durations.append(
            
                "Mode": "",
                "ParCount": 0,
                "ProcessId": process_id,
                "IterId": i,
                "Duration": time.time() - iter_start_time,
            ,
            ignore_index=True,
        )
    return durations

def main(args: Namespace) -> None:
    """Execute main script."""
    iter_durations: List[pd.DataFrame] = []
    mode_durations: List[pd.DataFrame] = []

    for par_count in list(map(int, args.par_counts.split(","))):
        total_iter_count: int = par_count * int(args.iter_count)

        print(f"\nRunning par_count processes in parallel and total_iter_count iterations in total")
        start_time_joblib: float = time.time()
        with Parallel(n_jobs=par_count) as parallel:
            joblib_durations: List[pd.DataFrame] = parallel(
                delayed(_process)(process_id, int(args.iter_count)) for process_id in range(par_count)
            )
        iter_durations.append(pd.concat(joblib_durations).assign(**"Mode": "Joblib", "ParCount": par_count))
        end_time_joblib: float = time.time()

        print(f"\nRunning par_count processes sequentially with total_iter_count iterations in total")
        start_time_seq: float = time.time()
        seq_durations: List[pd.DataFrame] = []
        for process_id in range(par_count):
            seq_durations.append(_process(process_id, int(args.iter_count)))
        iter_durations.append(pd.concat(seq_durations).assign(**"Mode": "Seq", "ParCount": par_count))
        end_time_seq: float = time.time()

        mode_durations.append(
            pd.DataFrame(
                
                    "Mode": ["Joblib", "Seq"],
                    "ParCount": [par_count] * 2,
                    "Duration": [end_time_joblib - start_time_joblib, end_time_seq - start_time_seq],
                    "TotalIterCount": [total_iter_count] * 2,
                
            )
        )

    print("\nDuration in seconds")
    grouping_columns: List[str] = ["Mode", "ParCount"]
    print(
        pd.concat(iter_durations)
        .groupby(grouping_columns)
        .agg("Duration": ["mean", "std"])
        .merge(
            pd.concat(mode_durations).groupby(grouping_columns).agg("Duration": ["mean"], "TotalIterCount": "mean"),
            on=grouping_columns,
            suffixes=["/Iter", " total"],
            how="inner",
        )
    )

if __name__ == "__main__":
    print(f"Command line: sys.argv")
    parser: argparse.ArgumentParser = argparse.ArgumentParser(description=__doc__)
    parser.add_argument(
        "par_counts",
        help="Comma separated list of parallel processes counts to start trials for (e.g. '1,2,4,8,16,32')",
    )
    parser.add_argument("iter_count", help="Number of iterations per parallel process to carry out")
    args: argparse.Namespace = parser.parse_args()

    start_time: float = time.time()
    main(args)
    print(f"\nTotal elapsed time: time.time() - start_time:.2f seconds")

环境

创建于'conda env create -f environment.time_parallel.yaml environment.time_parallel.yaml:

name: time_parallel
channels:
  - defaults
  - conda-forge
dependencies:
  - python=3.8.5
  - pip=20.3.3
  - pandas=1.2.0
  - numpy=1.19.2
  - joblib=1.0.0
  - tqdm=4.55.1

更新 1

感谢@sholderbach 的评论,我调查了 NumPy/Pandas 的用法并发现了一些事情。

1)

NumPy 使用线性代数后端,它会自动在并行线程中运行一些命令(包括矩阵乘法),这会导致过多的线程完全阻塞系统,并行进程越多,越多,因此每次迭代的持续时间也会增加。 我通过删除方法 _do_task 中的 NumPy 和 Pandas 操作并仅用简单的数学操作替换它来测试这个假设:

def _do_task() -> None:
    for _ in range(10):
        for i in range(10000000):
            _ = 1000 ^ 2 % 200   

结果完全符合预期,因为增加进程数(超出可用内核数)时,迭代的持续时间不会改变。

Windows 10

致电python time_parallel_processing.py 1,2,4,8 5

Duration in seconds
                  Duration/Iter           Duration total TotalIterCount
                           mean       std           mean           mean
Mode     ParCount
Joblib   1             2.562570  0.015496      13.468393              5
         2             2.556241  0.021074      13.781174             10
         4             2.565614  0.054754      16.171828             20
         8             2.630463  0.258474      20.328055             40
Seq      2             2.576542  0.033270      25.874965             10

AWS c5.9xlarge

致电python time_parallel_processing.py 1,2,4,8,16,32 10

                Duration/Iter           Duration total TotalIterCount
                         mean       std           mean           mean
Mode   ParCount
Joblib 1             2.082849  0.022352      20.854512             10
       2             2.126195  0.034078      21.596860             20
       4             2.287874  0.254493      27.420978             40
       8             2.141553  0.030316      21.912917             80
       16            2.156828  0.137937      24.483243            160
       32            3.581366  1.197282      42.884399            320
Seq    2             2.076256  0.004231      41.571033             20

2)

按照@sholderbach 的提示,我发现了许多其他链接,这些链接涵盖了自动使用多线程的线性代数后端以及如何关闭它的主题:

NumPy issue(来自@sholderbach) threadpoolctl package Nice article Pinning process to a specific CPU with Python (and package psutil)

添加到_process:

proc = psutil.Process()
proc.cpu_affinity([process_id])
with threadpool_limits(limits=1):   
   ...

添加到环境中:

- threadpoolctl=2.1.0
- psutil=5.8.0

注意:我不得不将joblib 替换为multiprocessing,因为固定在joblib 上无法正常工作(在Linux 上一次只生成了一半的进程)。

我做了一些测试,结果好坏参半。监控显示 pinnng 和每个进程限制为一个线程适用于 Windows 10 和 Linux/AWS c5.9xlarge。不幸的是,每次迭代的绝对持续时间会因这些“修复”而增加。 此外,每次迭代的持续时间仍会在某个并行化点开始增加。

结果如下:

Windows 10 电话:python time_parallel_processing.py 1,2,4,8 5

                Duration/Iter           Duration total TotalIterCount
                         mean       std           mean           mean
Mode   ParCount
Joblib 1             9.502184  0.046554      47.542230              5
       2             9.557120  0.092897      49.488612             10
       4             9.602235  0.078271      50.249238             20
       8            10.518716  0.422020      60.186707             40
Seq    2             9.493682  0.062105      95.083382             10

AWS c5.9xlarge 致电python time_parallel_processing.py 1,2,4,8,16,20,24,28,32 5

                  Duration/Iter           Duration total TotalIterCount
                           mean       std           mean           mean
Mode     ParCount
Parallel 1             5.271010  0.008730      15.862883              3
         2             5.400430  0.016094      16.271649              6
         4             5.708021  0.069001      17.428172             12
         8             6.088623  0.179789      18.745922             24
         16            8.330902  0.177772      25.566504             48
         20           10.515132  3.081697      47.895538             60
         24           13.506221  4.589382      53.348917             72
         28           16.318631  4.961513      57.536180             84            
         32           19.800182  4.435462      64.717435             96
Seq      2             5.212529  0.037129      31.332297              6

【问题讨论】:

你为什么用joblib.Parallel而不是multiprocessing.Pool 三个原因:1)我发现joblib 抽象更容易应用(而且它的酸洗机制更优越,不过这里不关心)2)它应该是一点点更快(参见例如here) 3)我尝试了multiprocessing,但得到了奇怪的结果,即在 Windows 上运行时间过长,在 Linux 下完全停止(这是另一个故事) 是否使用 MKL 作为 BLAS 库?因为取决于 BLAS 实现,像 matmul 这样的操作可能会调用多核优化代码? github.com/pandas-dev/pandas/issues/23139 那是……一大堆代码和文本。你确定你已经排除了明显的错误,例如数据从/到进程的传输时间大于潜在的加速,或者通过并行访问锤击吞吐量有限的磁盘? @MisterMiyagi:恕我直言,进程之间没有显着的数据传输,迭代速度也不会受到这种数据传输的影响,但是当增加使用的并行进程的数量时会显示增加。也没有磁盘 I/O。 【参考方案1】:

这种行为的原因是什么?

通常,这种类型的减速通常表示被 GIL 阻止、内核之间的上下文切换或进行大量酸洗

我错过了什么吗?

您可能遗漏了一些小问题 - 尝试分析(某些采样分析器可能比 cProfile 性能更高)以查看时间花在了哪里! 但是,在您重新实施以下建议之前,执行此操作的速度仍然有限​​

如何补救才能充分利用使用更多内核的潜力?

看看 numba 和 dask,它们可以让您通过 GIL 之外的并行化来极大地加速 numpy 和 pandas 代码

numba 编译 numpy 代码并将其缓存以提高速度和实用的处理器操作

dask 是一个框架,其中包含在单个和多个系统上进行高效并行化的好技巧

【讨论】:

"默认情况下,joblib.Parallel 使用 'loky' 后端模块来启动单独的 Python 工作进程以在不同的 CPU 上同时执行任务。" (请参阅here)每个进程都有自己的、独立的 Python 解释器和自己的、独立的 GIL,因此在这方面不应相互干扰。此外,由于流程之间没有大量数据传输(如果只是在最后),酸洗也不应该成为问题。

以上是关于为啥在更多 CPU/内核上的并行化在 Python 中的扩展性如此之差?的主要内容,如果未能解决你的问题,请参考以下文章

为啥使用更多线程会导致运行时间变慢?

GPU是并行计算,CPU是串行计算?为啥这么说?

如何使 numba @jit 使用所有 cpu 内核(并行化 numba @jit)

为啥在Python里推荐使用多进程而不是多线程

运行多个 python 程序时内核 CPU 高

使用 C 和并行化在 R 中快速关联