提高性能(矢量化?) pandas.groupby.aggregate
Posted
技术标签:
【中文标题】提高性能(矢量化?) pandas.groupby.aggregate【英文标题】:Improve performances (vectorize?) pandas.groupby.aggregate 【发布时间】:2021-03-13 07:31:04 【问题描述】:我正在尝试使用自定义聚合函数来提高 pandas.groupby.aggregate
操作的性能。我注意到 - 如果我错了,请纠正我 - pandas
按顺序调用每个块上的聚合函数(我怀疑它是一个简单的 for
-loop)。
由于pandas
很大程度上基于numpy
,有没有办法使用numpy
的矢量化功能加快计算速度?
我的代码
在我的代码中,我需要将风数据平均样本聚合在一起。虽然平均风速是微不足道的,但平均风向需要更特殊的代码(例如,1 度和 359 度的平均值是 0 度,而不是 180 度)。
我的聚合函数所做的是:
-
移除 NaN
如果不存在其他值,则返回 NaN
检查是否存在指示可变风向的特殊标志。如果是,则返回标志
用vector-averaging algorithm 平均风向
函数是:
def meandir(x):
'''
Parameters
----------
x : pandas.Series
pandas series to be averaged
Returns
-------
float
averaged wind direction
'''
# Removes the NaN from the recording
x = x.dropna()
# If the record is empty, return NaN
if len(x)==0:
return np.nan
# If the record contains variable samples (990) return variable (990)
elif np.any(x == 990):
return 990
# Otherwise sum the vectors and return the angle
else:
angle = np.rad2deg(
np.arctan2(
np.sum(np.sin(np.deg2rad(x))),
np.sum(np.cos(np.deg2rad(x)))
)
)
#Wrap angles from (-pi,pi) to (0,360)
return (angle + 360) % 360
你可以测试一下
from timeit import repeat
import pandas as pd
import numpy as np
N_samples = int(1e4)
N_nan = N_var = int(0.02 * N_samples)
# Generate random data
data = np.random.rand(N_samples,2) * [30, 360]
data[np.random.choice(N_samples, N_nan), 1] = np.nan
data[np.random.choice(N_samples, N_var), 1] = 990
# Create dataset
df = pd.DataFrame(data, columns=['WindSpeed', 'WindDir'])
df.index = pd.date_range(start='2000-01-01 00:00', periods=N_samples, freq='10min')
# Run groupby + aggregate
grouped = df.groupby(pd.Grouper(freq='H')) # Data from 14.30 to 15.29 are rounded to 15.00
aggfuns1 = 'WindSpeed': np.mean, 'WindDir':meandir
aggfuns2 = 'WindSpeed': np.mean, 'WindDir':np.mean
res = repeat(stmt='grouped.agg(aggfuns1)', globals=globals(), number=1, repeat=10)
print(f'With custom aggregating function min(res)*1000:.2f ms')
res = repeat(stmt='grouped.agg(aggfuns2)', globals=globals(), number=1, repeat=10)
print(f'Without custom aggregating function min(res)*1000:.2f ms')
在我的电脑上用于N_samples=1e4
输出:
With custom aggregating function 1500.79 ms
Without custom aggregating function 2.08 ms
自定义聚合函数慢了 750 倍
并带有N_samples=1e6
输出:
With custom aggregating function 142967.17 ms
Without custom aggregating function 21.92 ms
自定义聚合函数慢了 6500 倍!
有没有办法加快这行代码的速度?
【问题讨论】:
快速提问,repeat
函数使用什么?哪个特定的 API?
@AkshaySehgal timeit
。当我粘贴代码时,import
行被剪掉了。我立即编辑以包含它
那个特殊值990
是什么?
@PierreD 它表示可变风向。分发数据的组织 (NOAA) 选择了它。您可以或多或少地将其视为 NaN。但是,虽然 NaN 表示缺失样本,但它表示已测量的样本,但测量值是“可变的”
哦,我明白了:所以如果存在,那么您希望该 bin 中的平均值也是可变的。
【参考方案1】:
关键是尽量矢量化整个df
,让groupby
只使用内置方法。
这是一种方法。诀窍是将角度转换为复数,numpy 会很高兴地求和
(还有groupby
,但groupby
会拒绝mean()
)。因此,我们将角度转换为complex
、sum
,然后
转换回角度。角度的“有趣平均值”与您的代码中使用的相同,并在您引用的 Wikipedia 页面上进行了描述。
关于特殊值 (990
) 的处理,它也可以向量化:比较 s.groupby(...).count()
和 .replace(val, nan).groupby(...).count()
会找到至少有其中一个的所有组。
不管怎样,这里是:
def to_complex(s):
return np.exp(np.deg2rad(s) * 1j)
def to_angle(s):
return np.angle(s, deg=True) % 360
def mask_val(s, grouper, val=990):
return s.groupby(grouper).count() != s.replace(val, np.nan).groupby(grouper).count()
def myagg(df, grouper, val=990, winddir='WindDir'):
s = df[winddir]
mask = mask_val(s, grouper, val)
gb = to_complex(s).groupby(grouper)
s = gb.sum()
cnt = gb.count()
s = to_angle(s) * (cnt / cnt) # put NaN where all NaNs
s[mask] = val
# other columns
agg = df.groupby(grouper).mean()
agg[winddir] = s
return agg
应用:
为方便起见,我将您的示例生成放入函数gen_example(N_samples)
。
df = gen_example(50)
myagg(df, pd.Grouper(freq='H'))
Out[ ]:
WindSpeed WindDir
2000-01-01 00:00:00 12.991717 354.120464
2000-01-01 01:00:00 15.743056 60.813629
2000-01-01 02:00:00 14.593927 245.487383
2000-01-01 03:00:00 17.836368 131.493675
2000-01-01 04:00:00 18.987296 27.150359
2000-01-01 05:00:00 16.415725 194.923399
2000-01-01 06:00:00 20.881816 990.000000
2000-01-01 07:00:00 15.033480 44.626018
2000-01-01 08:00:00 16.276834 29.252459
速度:
df = gen_example(10_000)
%timeit myagg(df, pd.Grouper(freq='H'))
Out[ ]:
6.76 ms ± 12.3 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
df = gen_example(1e6)
%timeit myagg(df, pd.Grouper(freq='H'))
Out[ ]:
189 ms ± 425 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
测试:
idx = [0] * 4
grouper = pd.Grouper(level=0)
myagg(pd.DataFrame('WindDir': [170, 170, 178, 182], index=idx), grouper)
WindDir
0 174.998473
myagg(pd.DataFrame('WindDir': [330, 359, 1, 40], index=idx), grouper)
WindDir
0 2.251499
myagg(pd.DataFrame('WindDir': [330, 359, 1, np.nan], index=idx), grouper)
WindDir
0 350.102878
myagg(pd.DataFrame('WindDir': [np.nan, np.nan, np.nan, np.nan], index=idx), grouper)
WindDir
0 NaN
myagg(pd.DataFrame('WindDir': [330, 990, 1, np.nan], index=idx), grouper)
WindDir
0 990.0
【讨论】:
为什么角度需要是复数? @jakub:因为在 OP 引用的 Wikipedia 页面上非常雄辩地解释了原因:Mean of circular quantities。我最初错过了这一点,并认为,哦,好吧,可以做一些适当的模数。不是这样。示例:(0, 90, 180, 270)
度的角度平均值是多少?
谢谢!这非常有用——所有内容都有一个***页面。
非常感谢。所以这里的教训是“尽可能多地使用 usa pandas 内置(和矢量化)功能”,并且不要将 pandas.Groupby.aggreagate
与自定义聚合函数一起使用。我必须承认我希望学习如何加速我的自定义函数,而不是恢复到对内置函数的复杂使用。但是,我接受这个答案,因为它以一种非常聪明的方式解决了这个问题——最重要的是——达到了目标。赞!以上是关于提高性能(矢量化?) pandas.groupby.aggregate的主要内容,如果未能解决你的问题,请参考以下文章
使用 Apple Accelerate Framework vForce 库来提高性能
借助 SIMD 数据布局模板和数据预处理提高 SIMD 在动画中的使用效率