在 pandas groupby 之后并行化应用

Posted

技术标签:

【中文标题】在 pandas groupby 之后并行化应用【英文标题】:Parallelize apply after pandas groupby 【发布时间】:2014-11-29 00:46:19 【问题描述】:

我在groupby之后使用rosetta.parallel.pandas_easy并行化apply,例如:

from rosetta.parallel.pandas_easy import groupby_to_series_to_frame
df = pd.DataFrame('a': [6, 2, 2], 'b': [4, 5, 6],index= ['g1', 'g1', 'g2'])
groupby_to_series_to_frame(df, np.mean, n_jobs=8, use_apply=True, by=df.index)

但是,有没有人想出如何并行化返回 DataFrame 的函数?正如预期的那样,此代码对于 rosetta 失败。

def tmpFunc(df):
    df['c'] = df.a + df.b
    return df

df.groupby(df.index).apply(tmpFunc)
groupby_to_series_to_frame(df, tmpFunc, n_jobs=1, use_apply=True, by=df.index)

【问题讨论】:

【参考方案1】:

编辑:为了在 pandas groupby 上获得更好的计算性能,您可以使用 numba 在运行时将您的代码编译成 C 代码并以 C 速度运行。如果你在groupby 之后应用的函数是纯numpy 计算,它会超级快(比这个并行化快很多)。

您可以使用multiprocessingjoblib 来实现并行化。但是,如果组的数量很大并且每个组的DataFrame很大,则运行时间可能会更糟,因为您需要将这些组多次转移到CPU中。为了减少开销,我们可以先将数据分成大块,然后在这些块上并行计算。

例如,假设您正在处理股票数据,您需要按代码对股票进行分组,然后计算一些统计数据。您可以先按代码的第一个字符(大块)进行分组,然后在这个虚拟组中执行操作:

import pandas as pd
from joblib import Parallel, delayed

def group_func(dummy_group):
    # Do something to the group just like doing to the original dataframe.
    #     Example: calculate daily return.
    res = []
    for _, g in dummy_group.groupby('code'):
        g['daily_return']  = g.close / g.close.shift(1)
        res.append(g)
    return pd.concat(res)

stock_data = stock_data.assign(dummy=stock_data['code'].str[0])

Parallel(n_jobs=-1)(delayed(group_func)(group) for _, group in stock_data.groupby('dummy'))

【讨论】:

【参考方案2】:

Ivan 的回答很棒,不过看起来可以稍微简化一下,也不需要依赖joblib:

from multiprocessing import Pool, cpu_count

def applyParallel(dfGrouped, func):
    with Pool(cpu_count()) as p:
        ret_list = p.map(func, [group for name, group in dfGrouped])
    return pandas.concat(ret_list)

顺便说一句:这不能代替 any groupby.apply(),但它会涵盖典型情况:例如它应该涵盖案例 2 和案例 3 in the documentation,而您应该通过将参数 axis=1 提供给最终的 pandas.concat() 调用来获得案例 1 的行为。

编辑: 文档已更改;旧版本可以在here找到,无论如何我复制粘贴下面的三个例子。

case 1: group DataFrame apply aggregation function (f(chunk) -> Series) yield DataFrame, with group axis having group labels

case 2: group DataFrame apply transform function ((f(chunk) -> DataFrame with same indexes) yield DataFrame with resulting chunks glued together

case 3: group Series apply function with f(chunk) -> DataFrame yield DataFrame with result of chunks glued together

【讨论】:

@Keiku 不知道,我以前从未听说过 REPL...但是您是否尝试过 func = lambda x : x"? If this doesn't work either, I suggest you open a specific question. You should be able to reproduce just with applyParallel([('one', 1), ('two', 2)] , your_func)`` 感谢您的建议。看来我尝试重新启动控制台并解决了它。很抱歉给您添麻烦了。 文档似乎不再提供示例。有人可以详细说明一下吗?【参考方案3】:

根据this thread,我个人建议使用 dask。

正如@chrisb 所指出的,在 python 中使用 pandas 进行多处理可能会产生不必要的开销。它也可能像多线程甚至单线程一样执行。

Dask 是专门为多进程创建的。

【讨论】:

【参考方案4】:

伴随 JD Long 的回答的简短评论。我发现如果组的数量非常大(比如数十万),并且您的应用功能正在做一些相当简单和快速的事情,那么将您的数据帧分成块并将每个块分配给一个工作人员以执行groupby-apply(串行)比并行 groupby-apply 和让工作人员从包含多个组的队列中读取要快得多。示例:

import pandas as pd
import numpy as np
import time
from concurrent.futures import ProcessPoolExecutor, as_completed

nrows = 15000
np.random.seed(1980)
df = pd.DataFrame('a': np.random.permutation(np.arange(nrows)))

所以我们的数据框看起来像:

    a
0   3425
1   1016
2   8141
3   9263
4   8018

请注意,“a”列有很多组(想想客户 ID):

len(df.a.unique())
15000

对我们的组进行操作的函数:

def f1(group):
    time.sleep(0.0001)
    return group

启动一个池:

ppe = ProcessPoolExecutor(12)
futures = []
results = []

做一个并行的groupby-apply:

%%time

for name, group in df.groupby('a'):
    p = ppe.submit(f1, group)
    futures.append(p)

for future in as_completed(futures):
    r = future.result()
    results.append(r)

df_output = pd.concat(results)
del ppe

CPU times: user 18.8 s, sys: 2.15 s, total: 21 s
Wall time: 17.9 s

现在让我们添加一个列,将 df 划分为更少的组:

df['b'] = np.random.randint(0, 12, nrows)

现在只有 12 个,而不是 15000 个组:

len(df.b.unique())
12

我们将对 df 进行分区并对每个块执行 groupby-apply。

ppe = ProcessPoolExecutor(12)

包装乐趣:

def f2(df):
    df.groupby('a').apply(f1)
    return df

依次发送每个要操作的块:

%%time

for i in df.b.unique():
    p = ppe.submit(f2, df[df.b==i])
    futures.append(p)

for future in as_completed(futures):
    r = future.result()
    results.append(r)

df_output = pd.concat(results) 

CPU times: user 11.4 s, sys: 176 ms, total: 11.5 s
Wall time: 12.4 s

请注意,每组花费的时间量没有改变。相反,改变的是工作人员从中读取的队列的长度。我怀疑正在发生的事情是工作人员无法同时访问共享内存,并且不断返回以读取队列,从而相互踩踏。使用较大的块进行操作,工人返回的频率较低,因此这个问题得到了改善,整体执行速度更快。

【讨论】:

在我有 4 个物理内核的机器上,如果 f1 的延迟增加,我只能看到并行化的好处,否则串行有更好的时间。【参考方案5】:

这似乎可行,虽然它确实应该内置在 pandas 中

import pandas as pd
from joblib import Parallel, delayed
import multiprocessing

def tmpFunc(df):
    df['c'] = df.a + df.b
    return df

def applyParallel(dfGrouped, func):
    retLst = Parallel(n_jobs=multiprocessing.cpu_count())(delayed(func)(group) for name, group in dfGrouped)
    return pd.concat(retLst)

if __name__ == '__main__':
    df = pd.DataFrame('a': [6, 2, 2], 'b': [4, 5, 6],index= ['g1', 'g1', 'g2'])
    print 'parallel version: '
    print applyParallel(df.groupby(df.index), tmpFunc)

    print 'regular version: '
    print df.groupby(df.index).apply(tmpFunc)

    print 'ideal version (does not work): '
    print df.groupby(df.index).applyParallel(tmpFunc)

【讨论】:

你知道在pandas中加入并行化是否有进展吗? 通过对函数进行小的修改,可以返回常规应用返回的分层索引:def temp_func(func, name, group): return func(group), name def applyParallel(dfGrouped, func): retLst, top_index = zip(*Parallel(n_jobs=multiprocessing.cpu_count())(delayed(temp_func)(func, name, group) for name, group in dfGrouped)) return pd.concat(retLst, keys=top_index) Dang,我不知道如何在 cmets 中发布代码... 您应该能够通过将applyParallel 绑定到df 来使“理想版本”工作:from types import MethodType; df.applyParallel = MethodType(applyParallel, df) 我已经尝试过这种方法,但它并没有使用所有可用的 cpu,它只使用 1 或 2,即使我有 8 - 有人发生过这种情况吗? 小心,这最终会比单核版本慢!如果您向每个作业发送大量数据但只有很短的计算,那么开销不值得,并且最终会变慢。【参考方案6】:

我有一个用于在 Pandas 中实现并行化的 hack。我将我的数据帧分成块,将每个块放入列表的元素中,然后使用 ipython 的并行位对数据帧列表进行并行应用。然后我使用 pandas concat 函数将列表重新组合在一起。

但是,这并不普遍适用。它对我有用,因为我想应用于数据帧的每个块的功能大约需要一分钟。并且将我的数据拆分和组合起来并不需要那么长时间。所以这显然是一个杂牌。话虽如此,这里有一个例子。我正在使用 Ipython 笔记本,所以你会在我的代码中看到 %%time 魔法:

## make some example data
import pandas as pd

np.random.seed(1)
n=10000
df = pd.DataFrame('mygroup' : np.random.randint(1000, size=n), 
                   'data' : np.random.rand(n))
grouped = df.groupby('mygroup')

对于本例,我将根据上述 groupby 制作“块”,但这不一定是数据的分块方式。虽然这是一个很常见的模式。

dflist = []
for name, group in grouped:
    dflist.append(group)

设置并行位

from IPython.parallel import Client
rc = Client()
lview = rc.load_balanced_view()
lview.block = True

编写一个愚蠢的函数来应用我们的数据

def myFunc(inDf):
    inDf['newCol'] = inDf.data ** 10
    return inDf

现在让我们先串行然后并行运行代码。 串行优先:

%%time
serial_list = map(myFunc, dflist)
CPU times: user 14 s, sys: 19.9 ms, total: 14 s
Wall time: 14 s

现在并行

%%time
parallel_list = lview.map(myFunc, dflist)

CPU times: user 1.46 s, sys: 86.9 ms, total: 1.54 s
Wall time: 1.56 s

那么只需几毫秒就可以将它们合并回一个数据帧

%%time
combinedDf = pd.concat(parallel_list)
 CPU times: user 296 ms, sys: 5.27 ms, total: 301 ms
Wall time: 300 ms

我在我的 MacBook 上运行了 6 个 IPython 引擎,但您可以看到它将执行时间从 14 秒降至 2 秒。

对于真正长时间运行的随机模拟,我可以通过使用 StarCluster 启动集群来使用 AWS 后端。然而,很多时候,我只在我的 MBP 上跨 8 个 CPU 进行并行化。

【讨论】:

我会用我的代码试试这个,谢谢。你能解释一下为什么 apply 不会自动并行化操作吗?似乎拥有 apply 函数的全部好处是避免循环,但如果它不对这些组执行此操作,有什么好处? 由于 GIL,Python 中的并行化很困难的说法由来已久。请记住,apply 通常是语法糖,并且在它下面执行隐含循环。使用并行化有点棘手,因为并行化存在运行时成本,这有时会抵消并行化的好处。 parallel_list 是否缺少定义,因为它在此行出现错误name 'parallel_list' is not definedcombinedDf = pd.concat(parallel_list) 伊万,很明显!我认为他的答案非常好。 hackity hack 比我的少得多。

以上是关于在 pandas groupby 之后并行化应用的主要内容,如果未能解决你的问题,请参考以下文章

对 pandas 中的多层次数据进行子集化

在groupby之后访问pandas中的分层列

Pandas 在 groupby 和 nlargest 之后创建额外(重复)索引

在csv,pandas中的groupby之后创建自定义列

如何在 Pandas 中使用 apply 并行化许多(模糊)字符串比较?

如何在 Pandas 中使用 apply 并行化许多(模糊)字符串比较?