在 pandas groupby 之后并行化应用
Posted
技术标签:
【中文标题】在 pandas groupby 之后并行化应用【英文标题】:Parallelize apply after pandas groupby 【发布时间】:2014-11-29 00:46:19 【问题描述】:我在groupby
之后使用rosetta.parallel.pandas_easy
并行化apply
,例如:
from rosetta.parallel.pandas_easy import groupby_to_series_to_frame
df = pd.DataFrame('a': [6, 2, 2], 'b': [4, 5, 6],index= ['g1', 'g1', 'g2'])
groupby_to_series_to_frame(df, np.mean, n_jobs=8, use_apply=True, by=df.index)
但是,有没有人想出如何并行化返回 DataFrame 的函数?正如预期的那样,此代码对于 rosetta
失败。
def tmpFunc(df):
df['c'] = df.a + df.b
return df
df.groupby(df.index).apply(tmpFunc)
groupby_to_series_to_frame(df, tmpFunc, n_jobs=1, use_apply=True, by=df.index)
【问题讨论】:
【参考方案1】:编辑:为了在 pandas groupby
上获得更好的计算性能,您可以使用 numba 在运行时将您的代码编译成 C 代码并以 C 速度运行。如果你在groupby
之后应用的函数是纯numpy
计算,它会超级快(比这个并行化快很多)。
您可以使用multiprocessing
或joblib
来实现并行化。但是,如果组的数量很大并且每个组的DataFrame很大,则运行时间可能会更糟,因为您需要将这些组多次转移到CPU中。为了减少开销,我们可以先将数据分成大块,然后在这些块上并行计算。
例如,假设您正在处理股票数据,您需要按代码对股票进行分组,然后计算一些统计数据。您可以先按代码的第一个字符(大块)进行分组,然后在这个虚拟组中执行操作:
import pandas as pd
from joblib import Parallel, delayed
def group_func(dummy_group):
# Do something to the group just like doing to the original dataframe.
# Example: calculate daily return.
res = []
for _, g in dummy_group.groupby('code'):
g['daily_return'] = g.close / g.close.shift(1)
res.append(g)
return pd.concat(res)
stock_data = stock_data.assign(dummy=stock_data['code'].str[0])
Parallel(n_jobs=-1)(delayed(group_func)(group) for _, group in stock_data.groupby('dummy'))
【讨论】:
【参考方案2】:Ivan 的回答很棒,不过看起来可以稍微简化一下,也不需要依赖joblib:
from multiprocessing import Pool, cpu_count
def applyParallel(dfGrouped, func):
with Pool(cpu_count()) as p:
ret_list = p.map(func, [group for name, group in dfGrouped])
return pandas.concat(ret_list)
顺便说一句:这不能代替 any groupby.apply(),但它会涵盖典型情况:例如它应该涵盖案例 2 和案例 3 in the documentation,而您应该通过将参数 axis=1
提供给最终的 pandas.concat()
调用来获得案例 1 的行为。
编辑: 文档已更改;旧版本可以在here找到,无论如何我复制粘贴下面的三个例子。
case 1: group DataFrame apply aggregation function (f(chunk) -> Series) yield DataFrame, with group axis having group labels
case 2: group DataFrame apply transform function ((f(chunk) -> DataFrame with same indexes) yield DataFrame with resulting chunks glued together
case 3: group Series apply function with f(chunk) -> DataFrame yield DataFrame with result of chunks glued together
【讨论】:
@Keiku 不知道,我以前从未听说过 REPL...但是您是否尝试过func = lambda x : x"? If this doesn't work either, I suggest you open a specific question. You should be able to reproduce just with
applyParallel([('one', 1), ('two', 2)] , your_func)``
感谢您的建议。看来我尝试重新启动控制台并解决了它。很抱歉给您添麻烦了。
文档似乎不再提供示例。有人可以详细说明一下吗?【参考方案3】:
根据this thread,我个人建议使用 dask。
正如@chrisb 所指出的,在 python 中使用 pandas 进行多处理可能会产生不必要的开销。它也可能不像多线程甚至单线程一样执行。
Dask 是专门为多进程创建的。
【讨论】:
【参考方案4】:伴随 JD Long 的回答的简短评论。我发现如果组的数量非常大(比如数十万),并且您的应用功能正在做一些相当简单和快速的事情,那么将您的数据帧分成块并将每个块分配给一个工作人员以执行groupby-apply(串行)比并行 groupby-apply 和让工作人员从包含多个组的队列中读取要快得多。示例:
import pandas as pd
import numpy as np
import time
from concurrent.futures import ProcessPoolExecutor, as_completed
nrows = 15000
np.random.seed(1980)
df = pd.DataFrame('a': np.random.permutation(np.arange(nrows)))
所以我们的数据框看起来像:
a
0 3425
1 1016
2 8141
3 9263
4 8018
请注意,“a”列有很多组(想想客户 ID):
len(df.a.unique())
15000
对我们的组进行操作的函数:
def f1(group):
time.sleep(0.0001)
return group
启动一个池:
ppe = ProcessPoolExecutor(12)
futures = []
results = []
做一个并行的groupby-apply:
%%time
for name, group in df.groupby('a'):
p = ppe.submit(f1, group)
futures.append(p)
for future in as_completed(futures):
r = future.result()
results.append(r)
df_output = pd.concat(results)
del ppe
CPU times: user 18.8 s, sys: 2.15 s, total: 21 s
Wall time: 17.9 s
现在让我们添加一个列,将 df 划分为更少的组:
df['b'] = np.random.randint(0, 12, nrows)
现在只有 12 个,而不是 15000 个组:
len(df.b.unique())
12
我们将对 df 进行分区并对每个块执行 groupby-apply。
ppe = ProcessPoolExecutor(12)
包装乐趣:
def f2(df):
df.groupby('a').apply(f1)
return df
依次发送每个要操作的块:
%%time
for i in df.b.unique():
p = ppe.submit(f2, df[df.b==i])
futures.append(p)
for future in as_completed(futures):
r = future.result()
results.append(r)
df_output = pd.concat(results)
CPU times: user 11.4 s, sys: 176 ms, total: 11.5 s
Wall time: 12.4 s
请注意,每组花费的时间量没有改变。相反,改变的是工作人员从中读取的队列的长度。我怀疑正在发生的事情是工作人员无法同时访问共享内存,并且不断返回以读取队列,从而相互踩踏。使用较大的块进行操作,工人返回的频率较低,因此这个问题得到了改善,整体执行速度更快。
【讨论】:
在我有 4 个物理内核的机器上,如果 f1 的延迟增加,我只能看到并行化的好处,否则串行有更好的时间。【参考方案5】:这似乎可行,虽然它确实应该内置在 pandas 中
import pandas as pd
from joblib import Parallel, delayed
import multiprocessing
def tmpFunc(df):
df['c'] = df.a + df.b
return df
def applyParallel(dfGrouped, func):
retLst = Parallel(n_jobs=multiprocessing.cpu_count())(delayed(func)(group) for name, group in dfGrouped)
return pd.concat(retLst)
if __name__ == '__main__':
df = pd.DataFrame('a': [6, 2, 2], 'b': [4, 5, 6],index= ['g1', 'g1', 'g2'])
print 'parallel version: '
print applyParallel(df.groupby(df.index), tmpFunc)
print 'regular version: '
print df.groupby(df.index).apply(tmpFunc)
print 'ideal version (does not work): '
print df.groupby(df.index).applyParallel(tmpFunc)
【讨论】:
你知道在pandas中加入并行化是否有进展吗? 通过对函数进行小的修改,可以返回常规应用返回的分层索引:def temp_func(func, name, group): return func(group), name def applyParallel(dfGrouped, func): retLst, top_index = zip(*Parallel(n_jobs=multiprocessing.cpu_count())(delayed(temp_func)(func, name, group) for name, group in dfGrouped)) return pd.concat(retLst, keys=top_index)
Dang,我不知道如何在 cmets 中发布代码...
您应该能够通过将applyParallel
绑定到df
来使“理想版本”工作:from types import MethodType; df.applyParallel = MethodType(applyParallel, df)
我已经尝试过这种方法,但它并没有使用所有可用的 cpu,它只使用 1 或 2,即使我有 8 - 有人发生过这种情况吗?
小心,这最终会比单核版本慢!如果您向每个作业发送大量数据但只有很短的计算,那么开销不值得,并且最终会变慢。【参考方案6】:
我有一个用于在 Pandas 中实现并行化的 hack。我将我的数据帧分成块,将每个块放入列表的元素中,然后使用 ipython 的并行位对数据帧列表进行并行应用。然后我使用 pandas concat
函数将列表重新组合在一起。
但是,这并不普遍适用。它对我有用,因为我想应用于数据帧的每个块的功能大约需要一分钟。并且将我的数据拆分和组合起来并不需要那么长时间。所以这显然是一个杂牌。话虽如此,这里有一个例子。我正在使用 Ipython 笔记本,所以你会在我的代码中看到 %%time
魔法:
## make some example data
import pandas as pd
np.random.seed(1)
n=10000
df = pd.DataFrame('mygroup' : np.random.randint(1000, size=n),
'data' : np.random.rand(n))
grouped = df.groupby('mygroup')
对于本例,我将根据上述 groupby 制作“块”,但这不一定是数据的分块方式。虽然这是一个很常见的模式。
dflist = []
for name, group in grouped:
dflist.append(group)
设置并行位
from IPython.parallel import Client
rc = Client()
lview = rc.load_balanced_view()
lview.block = True
编写一个愚蠢的函数来应用我们的数据
def myFunc(inDf):
inDf['newCol'] = inDf.data ** 10
return inDf
现在让我们先串行然后并行运行代码。 串行优先:
%%time
serial_list = map(myFunc, dflist)
CPU times: user 14 s, sys: 19.9 ms, total: 14 s
Wall time: 14 s
现在并行
%%time
parallel_list = lview.map(myFunc, dflist)
CPU times: user 1.46 s, sys: 86.9 ms, total: 1.54 s
Wall time: 1.56 s
那么只需几毫秒就可以将它们合并回一个数据帧
%%time
combinedDf = pd.concat(parallel_list)
CPU times: user 296 ms, sys: 5.27 ms, total: 301 ms
Wall time: 300 ms
我在我的 MacBook 上运行了 6 个 IPython 引擎,但您可以看到它将执行时间从 14 秒降至 2 秒。
对于真正长时间运行的随机模拟,我可以通过使用 StarCluster 启动集群来使用 AWS 后端。然而,很多时候,我只在我的 MBP 上跨 8 个 CPU 进行并行化。
【讨论】:
我会用我的代码试试这个,谢谢。你能解释一下为什么 apply 不会自动并行化操作吗?似乎拥有 apply 函数的全部好处是避免循环,但如果它不对这些组执行此操作,有什么好处? 由于 GIL,Python 中的并行化很困难的说法由来已久。请记住,apply 通常是语法糖,并且在它下面执行隐含循环。使用并行化有点棘手,因为并行化存在运行时成本,这有时会抵消并行化的好处。parallel_list
是否缺少定义,因为它在此行出现错误name 'parallel_list' is not defined
:combinedDf = pd.concat(parallel_list)
?
伊万,很明显!我认为他的答案非常好。 hackity hack 比我的少得多。以上是关于在 pandas groupby 之后并行化应用的主要内容,如果未能解决你的问题,请参考以下文章
Pandas 在 groupby 和 nlargest 之后创建额外(重复)索引