将 concurrent.futures.ProcessPoolExecutor 与 DataFrame.GroupBy 一起使用

Posted

技术标签:

【中文标题】将 concurrent.futures.ProcessPoolExecutor 与 DataFrame.GroupBy 一起使用【英文标题】:Using concurrent.futures.ProcessPoolExecutor with DataFrame.GroupBy 【发布时间】:2021-05-25 02:55:25 【问题描述】:

这可能是一个常见问题,但我未能在网上找到任何好的/最新的解决方案。我目前正在为 n 家公司开发一个庞大的数据框,我们对每家公司进行一些繁重的计算,然后将所有结果汇总为一个新的数据框。非常简单,我们运行 df.groupby('company').apply(function) 并在它运行时去喝杯咖啡,因为这是一个单线程操作。

现在事情开始失控了(3 小时+ 等待时间),我们正在试验多处理。我们已经实现了下面的小“概念”,我们将 DataFrame.GroupBy 传递给 executor.map 回调函数并等待结果。

问题是,它似乎需要很长时间才能运行,并且没有关于每个线程中实际发生的事情的反馈。不确定这是正确的实现,我只能看到 CPU 和内存以 100% 的使用率运行,但执行程序永远不会完成。

以下是对数据库中每家公司进行的计算的简化版本。非常感谢您就如何正确使用 groupby 和多处理提供建议。

import time
import concurrent

def append_new_company_technicals(group):
    '''
    Takes a dataframe and build new columns with technical information
    '''
    print(group['ticker'].unique())
    group.sort_values(by='date', inplace=True)

    group['halfvol_30_abs'] = group['px'].rolling(30,min_periods = 21).apply(func)
    group['halfvol_180_abs'] = group['px1'].rolling(180,min_periods = 135).apply(func)
    group['halfvol_30_rel'] = group['px2'].rolling(30,min_periods = 21).apply(func)
    group['halfvol_180_rel'] = group['px3'].rolling(180,min_periods = 135).apply(func)
    return group
    
start = time.time()
with concurrent.futures.ProcessPoolExecutor() as executor:
    futures = executor.map(append_new_company_technicals, df_merged.groupby('ticker'))
end = time.time()
print("MultiProcessing computation:  secs ".format(end - start))

【问题讨论】:

您可能应该考虑使用 Dask,它是为处理数组和数据帧上的多核/分布式计算而构建的。它会比尝试推出自己的实现更好地处理您的任务 我们已经检查了 Dask、Modin 和 Ray,但是如果我们不进行一些重要的代码重构,没有一个可以工作......我们正在对 pandas + python 原生多处理进行最后一次尝试,但你是最终我们可能需要迁移。 【参考方案1】:

如果您在事物运行时需要一些反馈,请尝试将 tqdm 函数(来自 tqdm 包)应用于可迭代对象。在你的情况下 df_merged.groupby('ticker')。

【讨论】:

我认为这应该是评论,而不是答案。 你可能是对的。不幸的是,我是新来的,无法发布 cmets。 酷,从未尝试过 tqdm,感谢您的提示。

以上是关于将 concurrent.futures.ProcessPoolExecutor 与 DataFrame.GroupBy 一起使用的主要内容,如果未能解决你的问题,请参考以下文章

如何将Ios文件上传到

Javascript 将正则表达式 \\n 替换为 \n,将 \\t 替换为 \t,将 \\r 替换为 \r 等等

如何将视频文件转换格式

sh 一个将生成CA的脚本,将CA导入到钥匙串中,然后它将创建一个证书并与CA签名,然后将其导入到

python怎么将0写入文件?

如何将CMD窗口背景改成透明?