为啥我没有看到通过 Python 中的多处理加速?

Posted

技术标签:

【中文标题】为啥我没有看到通过 Python 中的多处理加速?【英文标题】:Why am I not seeing speed up via multiprocessing in Python?为什么我没有看到通过 Python 中的多处理加速? 【发布时间】:2017-10-16 03:28:26 【问题描述】:

我正在尝试并行化一个令人尴尬的并行 for 循环 (previously asked here) 并选择适合我的参数的 this implementation:

    with Manager() as proxy_manager:
        shared_inputs = proxy_manager.list([datasets, train_size_common, feat_sel_size, train_perc,
                                            total_test_samples, num_classes, num_features, label_set,
                                            method_names, pos_class_index, out_results_dir, exhaustive_search])
        partial_func_holdout = partial(holdout_trial_compare_datasets, *shared_inputs)

        with Pool(processes=num_procs) as pool:
            cv_results = pool.map(partial_func_holdout, range(num_repetitions))

我需要使用proxy object(在进程之间共享)的原因是共享代理列表datasets 中的第一个元素,它是一个大对象列表(每个大约200-300MB)。这个datasets 列表通常有 5-25 个元素。我通常需要在 HPC 集群上运行这个程序。

问题是,当我使用 32 个进程和 50GB 内存(num_repetitions=200,数据集是 10 个对象的列表,每个 250MB)运行这个程序时,我看不到加速 16 倍(具有 32 个并行进程)。我不明白为什么 - 任何线索?有什么明显的错误或错误的选择吗?我在哪里可以改进这个实现?有其他选择吗?

我相信这个问题之前已经讨论过,原因可能多种多样,并且非常具体到实施 - 因此我请求你提供你的 2 美分。谢谢。

更新:我使用 cProfile 进行了一些分析以获得更好的想法 - 这是一些按累积时间排序的信息。

In [19]: p.sort_stats('cumulative').print_stats(50)
Mon Oct 16 16:43:59 2017    profiling_log.txt

         555404 function calls (543552 primitive calls) in 662.201 seconds

   Ordered by: cumulative time
   List reduced from 4510 to 50 due to restriction <50>

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
    897/1    0.044    0.000  662.202  662.202 built-in method builtins.exec
        1    0.000    0.000  662.202  662.202 test_rhst.py:2(<module>)
        1    0.001    0.001  661.341  661.341 test_rhst.py:70(test_chance_classifier_binary)
        1    0.000    0.000  661.336  661.336 /Users/Reddy/dev/neuropredict/neuropredict/rhst.py:677(run)
        4    0.000    0.000  661.233  165.308 /Users/Reddy/anaconda/envs/py36/lib/python3.6/threading.py:533(wait)
        4    0.000    0.000  661.233  165.308 /Users/Reddy/anaconda/envs/py36/lib/python3.6/threading.py:263(wait)
       23  661.233   28.749  661.233   28.749 method 'acquire' of '_thread.lock' objects
        1    0.000    0.000  661.233  661.233 /Users/Reddy/anaconda/envs/py36/lib/python3.6/multiprocessing/pool.py:261(map)
        1    0.000    0.000  661.233  661.233 /Users/Reddy/anaconda/envs/py36/lib/python3.6/multiprocessing/pool.py:637(get)
        1    0.000    0.000  661.233  661.233 /Users/Reddy/anaconda/envs/py36/lib/python3.6/multiprocessing/pool.py:634(wait)
    866/8    0.004    0.000    0.868    0.108 <frozen importlib._bootstrap>:958(_find_and_load)
    866/8    0.003    0.000    0.867    0.108 <frozen importlib._bootstrap>:931(_find_and_load_unlocked)
    720/8    0.003    0.000    0.865    0.108 <frozen importlib._bootstrap>:641(_load_unlocked)
    596/8    0.002    0.000    0.865    0.108 <frozen importlib._bootstrap_external>:672(exec_module)
   1017/8    0.001    0.000    0.863    0.108 <frozen importlib._bootstrap>:197(_call_with_frames_removed)
   522/51    0.001    0.000    0.765    0.015 built-in method builtins.__import__

分析信息现在按time排序

In [20]: p.sort_stats('time').print_stats(20)
Mon Oct 16 16:43:59 2017    profiling_log.txt

         555404 function calls (543552 primitive calls) in 662.201 seconds

   Ordered by: internal time
   List reduced from 4510 to 20 due to restriction <20>

   ncalls  tottime  percall  cumtime  percall filename:lineno(function)
       23  661.233   28.749  661.233   28.749 method 'acquire' of '_thread.lock' objects
   115/80    0.177    0.002    0.211    0.003 built-in method _imp.create_dynamic
      595    0.072    0.000    0.072    0.000 built-in method marshal.loads
        1    0.045    0.045    0.045    0.045 method 'acquire' of '_multiprocessing.SemLock' objects
    897/1    0.044    0.000  662.202  662.202 built-in method builtins.exec
        3    0.042    0.014    0.042    0.014 method 'read' of '_io.BufferedReader' objects
2037/1974    0.037    0.000    0.082    0.000 built-in method builtins.__build_class__
      286    0.022    0.000    0.061    0.000 /Users/Reddy/anaconda/envs/py36/lib/python3.6/site-packages/scipy/misc/doccer.py:12(docformat)
     2886    0.021    0.000    0.021    0.000 built-in method posix.stat
       79    0.016    0.000    0.016    0.000 built-in method posix.read
      597    0.013    0.000    0.021    0.000 <frozen importlib._bootstrap_external>:830(get_data)
      276    0.011    0.000    0.013    0.000 /Users/Reddy/anaconda/envs/py36/lib/python3.6/sre_compile.py:250(_optimize_charset)
      108    0.011    0.000    0.038    0.000 /Users/Reddy/anaconda/envs/py36/lib/python3.6/site-packages/scipy/stats/_distn_infrastructure.py:626(_construct_argparser)
     1225    0.011    0.000    0.050    0.000 <frozen importlib._bootstrap_external>:1233(find_spec)
     7179    0.009    0.000    0.009    0.000 method 'splitlines' of 'str' objects
       33    0.008    0.000    0.008    0.000 built-in method posix.waitpid
      283    0.008    0.000    0.015    0.000 /Users/Reddy/anaconda/envs/py36/lib/python3.6/site-packages/scipy/misc/doccer.py:128(indentcount_lines)
        3    0.008    0.003    0.008    0.003 method 'poll' of 'select.poll' objects
     7178    0.008    0.000    0.008    0.000 method 'expandtabs' of 'str' objects
      597    0.007    0.000    0.007    0.000 method 'read' of '_io.FileIO' objects

更多按percall 排序的分析信息:

更新 2

我之前提到的大列表datasets 中的元素通常没有那么大——它们通常每个10-25MB。但是根据使用的浮点精度、样本数量和特征,每个元素也可以轻松增长到 500MB-1GB。因此我更喜欢可以扩展的解决方案。

更新 3:

holdout_trial_compare_datasets 中的代码使用 scikit-learn 的 GridSearchCV 方法,如果我们设置 n_jobs > 1(或者无论何时设置它),它都会在内部使用 joblib 库。这可能会导致多处理和 joblib 之间的一些不良交互。所以尝试另一个我根本没有设置 n_jobs 的配置(这应该默认在 scikit-learn 中没有并行性)。会及时通知您。

【问题讨论】:

你做过分析吗? 还没有,因为我想测试它的参数(16-32 个进程,10-15 个数据集)要求我在集群上运行它,我不知道如何在命令行上配置 python 程序。我会尽快调查的。 我的 2¢:如果你的大数据对象只从父级传递给子级,Manager 似乎有点矫枉过正,你可以将它加载到父级的全局变量中,然后它将与子级共享fork(). 您似乎在分析父进程,而不是子工作者? 你可以使用a decorator to profile你的partial_func_holdout函数。或者使用这个:gist.github.com/nealtodd/2489618 【参考方案1】:

根据cmets中的讨论,我做了一个小实验,比较了三个版本的实现:

v1:和你的方法基本一样,其实partial(f1, *shared_inputs)会立即解包proxy_manager.listManager.List这里不涉及,数据通过Pool的内部队列传递给worker。 v2:v2 使用了Manager.List,工作函数将接收ListProxy 对象,它通过与服务器进程的内部连接获取共享数据。 v3:子进程从父进程共享数据,利用fork(2) 系统调用。
def f1(*args):
    for e in args[0]: pow(e, 2)

def f2(*args):
    for e in args[0][0]: pow(e, 2)

def f3(n):
    for i in datasets: pow(i, 2)

def v1(np):
    with mp.Manager() as proxy_manager:
        shared_inputs = proxy_manager.list([datasets,])
        pf = partial(f1, *shared_inputs)
        with mp.Pool(processes=np) as pool:
            r = pool.map(pf, range(16))

def v2(np):
    with mp.Manager() as proxy_manager:
        shared_inputs = proxy_manager.list([datasets,])
        pf = partial(f2, shared_inputs)
        with mp.Pool(processes=np) as pool:
            r = pool.map(pf, range(16))

def v3(np):
    with mp.Pool(processes=np) as pool:
        r = pool.map(f3, range(16))

datasets = [2.0 for _ in range(10 * 1000 * 1000)]
for f in (v1, v2, v3):
    print(f.__code__.co_name)
    for np in (2, 4, 8, 16):
        s = time()
        f(np)
        print("%s %.2fs" % (np, time()-s))

在 16 核 E5-2682 VPC 上取得的结果,很明显 v3 的扩展性更好:

【讨论】:

谢谢乔治 - 真的帮助我更接近理解问题。我真的很好奇 *shared_inputs 立即拆包 - 我没有意识到会发生这种情况......当我们应用 functools.partial 时也会发生类似的事情,因为它尝试冻结输入和签名,对吗?因此,无论是否解包,数据都必须通过酸洗传递给子进程,对吧?这意味着,我们唯一可以避免的是将它们用作全局变量并从子进程中访问它们?这种方法有什么危险吗?在子进程中不会对输入进行任何更改。 作为*shared_inputspartial 生效之前的解压列表,它“冻结”(代理)列表包含的对象,而不是代理本身,v2 它是使用Manager.List 部分的示例;是的,数据得到了pickle/unpickle来传递;在您的问题和工作量的上下文中,没有什么可担心的,如果您传递与外部资源相关的东西,如套接字对象、文件描述符,您需要更加小心。 感谢 George 的帮助 - 我觉得我们需要更好地建立瓶颈 .. 尤其是大型对象(20MB 的 10 个元素)和复杂的计算(比对常规数字网格求平方要求更高的东西) ,因为这可能会触发内部记忆).. 你的 cmets 非常有帮助,我希望能解决这个问题。【参考方案2】:
method 'acquire' of '_thread.lock' objects

查看您的分析器输出,我会说共享对象锁定/解锁开销压倒了多线程的速度增益。

重构,以便将工作外包给不需要相互交谈的工人。

具体来说,如果可能的话,为每个数据堆推导出一个答案,然后对累积的结果采取行动。

这就是队列看起来快得多的原因:它们涉及一种不需要必须“管理”对象并因此锁定/解锁的工作。

仅“管理”绝对需要在进程之间共享的事物。您的托管列表包含一些看起来非常复杂的对象...

更快的范例是:

allwork = manager.list([a, b,c])
theresult = manager.list()

然后

while mywork:
    unitofwork = allwork.pop()
    theresult = myfunction(unitofwork)

【讨论】:

感谢您的建议和回复。事实上,不同进程中的工作并不需要全部相互交谈,即一旦他们访问共享数据列表shared_inputs,工作单元内的所有内容都是独立于其他的。 我不确定我实施的建议与您的建议有何不同。您能否尝试详细说明,或者写一个更详细的实现来代替我在顶部显示的内容?太好了,谢谢。【参考方案3】:

如果您不需要复杂的共享对象,则只使用可以想象的最简单对象的列表。

然后告诉工人获取他们可以在自己的小世界中处理的复杂数据。

试试:

allwork = manager.list([datasetid1, datasetid2 ,...])
theresult = manager.list()

while mywork:
    unitofworkid = allwork.pop()
    theresult = myfunction(unitofworkid)

def myfunction(unitofworkid):
    thework = acquiredataset(unitofworkid)
    result = holdout_trial_compare_datasets(thework, ...)

我希望这是有道理的。在这个方向上重构应该不会花费太多时间。当您进行分析时,您应该会看到 method 'acquire' of '_thread.lock' objects 的数量像石头一样下降。

【讨论】:

谢谢,我需要考虑一下。在我的情况下,这将导致巨大的 I/O - 但需要考虑与实际并行计算相比,它是否只是很小的一部分时间......

以上是关于为啥我没有看到通过 Python 中的多处理加速?的主要内容,如果未能解决你的问题,请参考以下文章

为啥 C# 中的多线程不能达到 100% CPU?

知乎为啥使用Tornado?使用Python中的多线程特性了吗

python中的多线程为啥会报错?

知乎为啥使用Tornado?使用Python中的多线程特性了吗

是否可以使用多线程加速脚本?

为啥带有酸洗的多处理序列化取决于范围?