为啥多处理比 Pandas 中的简单计算慢?

Posted

技术标签:

【中文标题】为啥多处理比 Pandas 中的简单计算慢?【英文标题】:why is multiprocessing slower than a simple computation in Pandas?为什么多处理比 Pandas 中的简单计算慢? 【发布时间】:2018-09-25 00:37:37 【问题描述】:

这与how to parallelize many (fuzzy) string comparisons using apply in Pandas?有关

再次考虑这个简单(但有趣)的例子:

import dask.dataframe as dd
import dask.multiprocessing
import dask.threaded
from fuzzywuzzy import fuzz
import pandas as pd

master= pd.DataFrame('original':['this is a nice sentence',
'this is another one',
'*** is nice'])

slave= pd.DataFrame('name':['hello world',
'congratulations',
'this is a nice sentence ',
'this is another one',
'*** is nice'],'my_value': [1,2,3,4,5])

def fuzzy_score(str1, str2):
    return fuzz.token_set_ratio(str1, str2)

def helper(orig_string, slave_df):
    slave_df['score'] = slave_df.name.apply(lambda x: fuzzy_score(x,orig_string))
    #return my_value corresponding to the highest score
    return slave_df.loc[slave_df.score.idxmax(),'my_value']

master
Out[39]: 
                  original
0  this is a nice sentence
1      this is another one
2    *** is nice

slave
Out[40]: 
   my_value                      name
0         1               hello world
1         2           congratulations
2         3  this is a nice sentence 
3         4       this is another one
4         5     *** is nice

我需要做的很简单:

对于master 中的每一行,我使用fuzzywuzzy 计算的字符串相似度得分在Dataframe slave 中查找最佳匹配。

现在让我们把这些数据框放大一点:

master = pd.concat([master] * 100,  ignore_index  = True)
slave = pd.concat([slave] * 10,  ignore_index  = True)

首先,我尝试过dask

#prepare the computation
dmaster = dd.from_pandas(master, npartitions=4)
dmaster['my_value'] = dmaster.original.apply(lambda x: helper(x, slave),meta=('x','f8'))

现在是时间安排:

#multithreaded
%timeit dmaster.compute(get=dask.threaded.get) 
1 loop, best of 3: 346 ms per loop

#multiprocess
%timeit dmaster.compute(get=dask.multiprocessing.get) 
1 loop, best of 3: 1.93 s per loop

#good 'ol pandas
%timeit master['my_value'] = master.original.apply(lambda x: helper(x,slave))
100 loops, best of 3: 2.18 ms per loop

其次,我尝试过使用旧的 multiprocessing

from multiprocessing import Pool, cpu_count

def myfunc(df):
    return df.original.apply(lambda x: helper(x, slave))

from datetime import datetime

if __name__ == '__main__':
     startTime = datetime.now()
     p = Pool(cpu_count() - 1)
     ret_list = p.map(myfunc, [master.iloc[1:100,], master.iloc[100:200 ,],
                               master.iloc[200:300 ,]])
     results = pd.concat(ret_list)
     print datetime.now() - startTime

大约在同一时间

runfile('C:/Users/john/untitled6.py', wdir='C:/Users/john')
0:00:01.927000

问题:为什么与这里的 Pandas 相比,Daskmultiprocessing 的多处理速度如此之慢?假设我的真实数据比这大得多。我能得到更好的结果吗?

毕竟我这里考虑的问题是embarassingly parallel(每一行都是一个独立的问题),所以这些包应该真的很亮眼。

我错过了什么吗?

谢谢!

【问题讨论】:

我认为这是因为您需要更多的计算(水平扩展)能力来证明 dask 的所有额外开销是合理的。 还不够! numpy 也能做到,你需要更多的 computers :D 第一次调用 multiprocessing 需要时间来启动额外的进程,然后才能开始计算任何东西。 如果您的所有数据都可以轻松放入内存中,那么 dask 不一定是正确的选择。哦,您应该使用分布式调度程序,即使在单台机器上也是如此。 接下来,您需要考虑与序列化数据、将其发送到进程、反序列化以及返回输出。除此之外,还有管理任务执行的开销。 这听起来像是答案吗?一些讨论distributed.readthedocs.io/en/latest/efficiency.html 【参考方案1】:

让我将我制作的 cmets 总结为类似答案的内容。我希望这些信息对您有用,因为这里汇集了许多问题。

首先,我想向您指出 Distributed.readthedocs.io/en/latest/efficiency.html ,其中讨论了许多 dask 性能主题。请注意,这都是关于分布式调度程序的,但由于它可以在进程内、线程或进程或它们的组合中启动,它确实取代了以前的 dask 调度程序,并且通常在所有情况下都推荐使用。

1) 创建流程需要时间。这总是正确的,尤其是在 Windows 上。如果您对现实生活中的性能感兴趣,您将希望只创建一次流程,并使用其固定开销并运行许多任务。在 dask 中,有 many ways 制作您的集群,甚至是在本地。

2) dask(或任何其他调度程序)处理的每个任务都会产生一些开销。在分布式调度器的情况下,这是

3) 在客户端加载整个数据集并将其传递给工作人员是 dask 中的一种反模式。相反,您希望使用像 dask.dataframe.read_csv 这样的函数,其中数据由工作人员加载,避免昂贵的序列化和进程间通信。 Dask 非常擅长将计算转移到数据所在的位置,从而最大限度地减少通信。

4)当进程之间的通信时,序列化的方法很重要,这就是我猜测为什么 non-dask 多处理对你来说这么慢。

5) 最后,并非所有工作都能在黑暗中获得绩效提升。这取决于很多事情,但通常主要的是:数据是否适合内存?如果是,可能很难匹配 numpy 和 pandas 中优化良好的方法。与往常一样,您应该始终分析您的代码...

【讨论】:

非常好。您可能会尝试使您的问题更具信息性的一件事是找到这样的阈值。也就是说,您能否尝试在具有足够大数据帧的机器上运行我的代码,以便 dask 或 multiprocessing 更快?那太棒了 我没有你的数据:|此外,“阈值”将取决于大量变量,因此最好遵守此处描述的经验法则。 不,但我的意思是上面的工作示例。您可以改变示例数据框的大小 参见上面的 3),你不应该在一个进程中创建数据并将其传递给工作人员,在这种情况下,序列化/通信需要的时间比处理时间长,而且你永远不会赢。 我明白了,非常感谢。但这对dask 来说是非常具体的。在您看来,这是否也适用于使用multiprocessing?再次感谢!

以上是关于为啥多处理比 Pandas 中的简单计算慢?的主要内容,如果未能解决你的问题,请参考以下文章

为啥并行多线程代码执行比顺序慢?

Pandas中数据的处理

为啥每次调用 Spring MVC 服务中的简单方法都比静态方法慢?

为啥使用numpy和pandas来进行数据处理?

为啥 Apache-Spark - Python 在本地比 pandas 慢?

iOS Metal 计算管道比搜索任务的 CPU 实现慢