通过用向量化替换 lambda x 来提高排名函数的性能

Posted

技术标签:

【中文标题】通过用向量化替换 lambda x 来提高排名函数的性能【英文标题】:Performance enhancement of ranking function by replacement of lambda x with vectorization 【发布时间】:2017-10-17 06:36:21 【问题描述】:

我有一个排名功能,我将其应用于数百万行的大量列,这需要几分钟才能运行。通过删除为应用 .rank( 方法准备数据的所有逻辑,即通过执行以下操作:

ranked = df[['period_id', 'sector_name'] + to_rank].groupby(['period_id', 'sector_name']).transform(lambda x: (x.rank(ascending = True) - 1)*100/len(x))        

我设法把它缩短到几秒钟。但是,我需要保留我的逻辑,并且正在努力重组我的代码:最终,最大的瓶颈是我对 lambda x: 的双重使用,但显然其他方面正在减慢速度(见下文)。我提供了一个示例数据框,以及下面的排名函数,即 MCVE。总的来说,我认为我的问题可以归结为:

(i) 如何将代码中的.apply(lambda x 用法替换为快速的矢量化等效项? (ii) 如何遍历多索引、分组数据帧并应用函数?在我的例子中,对于 date_id 和 category 列的每个唯一组合。 (iii) 我还能做些什么来加快我的排名逻辑?主要开销似乎在.value_counts()。这与上述 (i) 重叠;在发送排名之前,也许可以通过构建临时列来在 df 上完成大部分逻辑。同样,可以在一次调用中对子数据帧进行排名吗? (iv) 为什么使用pd.qcut() 而不是df.rank()?后者是cythonized,似乎对领带的处理更灵活,但我看不到两者之间的比较,pd.qcut() 似乎使用最广泛。

示例输入数据如下:

import pandas as pd
import numpy as np
import random

to_rank = ['var_1', 'var_2', 'var_3']
df = pd.DataFrame('var_1' : np.random.randn(1000), 'var_2' : np.random.randn(1000), 'var_3' : np.random.randn(1000))
df['date_id'] = np.random.choice(range(2001, 2012), df.shape[0])
df['category'] = ','.join(chr(random.randrange(97, 97 + 4 + 1)).upper() for x in range(1,df.shape[0]+1)).split(',')

两个排名函数是:

def rank_fun(df, to_rank): # calls ranking function f(x) to rank each category at each date
    #extra data tidying logic here beyond scope of question - can remove
    ranked = df[to_rank].apply(lambda x: f(x))
    return ranked


def f(x):
    nans = x[np.isnan(x)] # Remove nans as these will be ranked with 50
    sub_df = x.dropna() # 
    nans_ranked = nans.replace(np.nan, 50) # give nans rank of 50

    if len(sub_df.index) == 0: #check not all nan.  If no non-nan data, then return with rank 50
        return nans_ranked

    if len(sub_df.unique()) == 1: # if all data has same value, return rank 50
        sub_df[:] = 50
        return sub_df

    #Check that we don't have too many clustered values, such that we can't bin due to overlap of ties, and reduce bin size provided we can at least quintile rank.
    max_cluster = sub_df.value_counts().iloc[0] #value_counts sorts by counts, so first element will contain the max
    max_bins = len(sub_df) / max_cluster 

    if max_bins > 100: #if largest cluster <1% of available data, then we can percentile_rank
        max_bins = 100

    if max_bins < 5: #if we don't have the resolution to quintile rank then assume no data.
        sub_df[:] = 50
        return sub_df

    bins = int(max_bins) # bin using highest resolution that the data supports, subject to constraints above (max 100 bins, min 5 bins)

    sub_df_ranked = pd.qcut(sub_df, bins, labels=False) #currently using pd.qcut.  pd.rank( seems to have extra functionality, but overheads similar in practice
    sub_df_ranked *= (100 / bins) #Since we bin using the resolution specified in bins, to convert back to decile rank, we have to multiply by 100/bins.  E.g. with quintiles, we'll have scores 1 - 5, so have to multiply by 100 / 5 = 20 to convert to percentile ranking
    ranked_df = pd.concat([sub_df_ranked, nans_ranked])
    return ranked_df

调用我的排名函数并与df重组的代码是:

# ensure don't get duplicate columns if ranking already executed
ranked_cols = [col + '_ranked' for col in to_rank]

ranked = df[['date_id', 'category'] + to_rank].groupby(['date_id', 'category'], as_index = False).apply(lambda x: rank_fun(x, to_rank)) 
ranked.columns = ranked_cols        
ranked.reset_index(inplace = True)
ranked.set_index('level_1', inplace = True)    
df = df.join(ranked[ranked_cols])

我试图通过删除两个 lambda x 调用,尽可能快地获得此排名逻辑;我可以删除 rank_fun 中的逻辑,以便只有 f(x) 的逻辑适用,但我也不知道如何以矢量化方式处理多索引数据帧。另一个问题是关于pd.qcut(df.rank( 之间的差异:似乎两者都有不同的处理关系的方式,但开销似乎相似,尽管事实上 .rank( 被cythonized;考虑到这可能会产生误导主要开销是由于我使用了 lambda x。

我在f(x) 上运行%lprun 得到了以下结果,尽管主要开销是使用.apply(lambda x 而不是矢量化方法:

Line # Hits Time Per Hit % Time Line Contents

 2                                           def tst_fun(df, field):
 3         1          685    685.0      0.2      x = df[field]
 4         1        20726  20726.0      5.8      nans = x[np.isnan(x)]
 5         1        28448  28448.0      8.0      sub_df = x.dropna()
 6         1          387    387.0      0.1      nans_ranked = nans.replace(np.nan, 50)
 7         1            5      5.0      0.0      if len(sub_df.index) == 0: 
 8                                                   pass #check not empty.  May be empty due to nans for first 5 years e.g. no revenue/operating margin data pre 1990
 9                                                   return nans_ranked
10                                           
11         1        65559  65559.0     18.4      if len(sub_df.unique()) == 1: 
12                                                   sub_df[:] = 50 #e.g. for subranks where all factors had nan so ranked as 50 e.g. in 1990
13                                                   return sub_df
14                                           
15                                               #Finally, check that we don't have too many clustered values, such that we can't bin, and reduce bin size provided we can at least quintile rank.
16         1        74610  74610.0     20.9      max_cluster = sub_df.value_counts().iloc[0] #value_counts sorts by counts, so first element will contain the max
17                                               # print(counts)
18         1            9      9.0      0.0      max_bins = len(sub_df) / max_cluster #
19                                           
20         1            3      3.0      0.0      if max_bins > 100: 
21         1            0      0.0      0.0          max_bins = 100 #if largest cluster <1% of available data, then we can percentile_rank
22                                           
23                                           
24         1            0      0.0      0.0      if max_bins < 5: 
25                                                   sub_df[:] = 50 #if we don't have the resolution to quintile rank then assume no data.
26                                           
27                                               #     return sub_df
28                                           
29         1            1      1.0      0.0      bins = int(max_bins) # bin using highest resolution that the data supports, subject to constraints above (max 100 bins, min 5 bins)
30                                           
31                                               #should track bin resolution for all data.  To add.
32                                           
33                                               #if get here, then neither nans_ranked, nor sub_df are empty
34                                               # sub_df_ranked = pd.qcut(sub_df, bins, labels=False)
35         1       160530 160530.0     45.0      sub_df_ranked = (sub_df.rank(ascending = True) - 1)*100/len(x)
36                                           
37         1         5777   5777.0      1.6      ranked_df = pd.concat([sub_df_ranked, nans_ranked])
38                                               
39         1            1      1.0      0.0      return ranked_df

【问题讨论】:

您是否考虑过使用多处理来更快地运行 lambda 语句?我不知道 pandas 处理多处理/多线程的能力如何,但我认为你应该试一试。 谢谢,这是一个有趣的想法。尽管如此,它必须可以矢量化我的“循环”! Numba 或许能够矢量化您的排名函数。 我没有花足够的时间来获得一个很好的答案,但是您是否尝试将数据放入可以并行运行的列中,然后将这些值传递给矢量化函数,比如bn.nanrankdata?这样,您无需调用 python n 次,您可以留在 C 代码中。但这取决于能否拥有一个可以原子地在每一列上运行的函数。你能做到吗? 我不太清楚,但如果它像map 那样工作,也许不将函数封闭在 lambda 上会运行得更快,ranked = df[to_rank].apply(f) 【参考方案1】:

我将使用 numpy 构建一个函数我计划在 pandas groupbygroupby 中定义的每个组中使用它

def rnk(df):
    a = df.values.argsort(0)
    n, m = a.shape
    r = np.arange(a.shape[1])
    b = np.empty_like(a)
    b[a, np.arange(m)[None, :]] = np.arange(n)[:, None]
    return pd.DataFrame(b / n, df.index, df.columns)

gcols = ['date_id', 'category']
rcols = ['var_1', 'var_2', 'var_3']
df.groupby(gcols)[rcols].apply(rnk).add_suffix('_ranked')

   var_1_ranked  var_2_ranked  var_3_ranked
0      0.333333      0.809524      0.428571
1      0.160000      0.360000      0.240000
2      0.153846      0.384615      0.461538
3      0.000000      0.315789      0.105263
4      0.560000      0.200000      0.160000
...

工作原理

因为我知道排名与排序有关,所以我想使用一些巧妙的排序来更快地做到这一点。

numpyargsort 将产生一个排列,可用于将数组分割成排序数组。

a = np.array([25, 300, 7])
b = a.argsort()
print(b)

[2 0 1]

print(a[b])

[  7  25 300]

因此,我将使用argsort 告诉我排名第一、第二和第三的元素在哪里。

# create an empty array that is the same size as b or a
# but these will be ranks, so I want them to be integers
# so I use empty_like(b) because b is the result of 
# argsort and is already integers.
u = np.empty_like(b)

# now just like when I sliced a above with a[b]
# I slice u the same way but instead I assign to
# those positions, the ranks I want.
# In this case, I defined the ranks as np.arange(b.size) + 1
u[b] = np.arange(b.size) + 1

print(u)

[2 3 1]

这完全正确。 7 排在最后,但我们排名第一。 300 排在第二位,是我们的第三位。 25是第一名,是我们的第二名。

最后,我除以排名中的数字得到百分位数。碰巧的是,因为我使用了从零开始的排名 np.arange(n),而不是在我们的示例中使用从零开始的排名 np.arange(1, n+1)np.arange(n) + 1,所以我可以通过简单的除法来获得百分位数。 剩下要做的就是将此逻辑应用于每个组。我们可以在pandasgroupby 中做到这一点 一些缺失的细节包括我如何使用argsort(0) 对每列进行独立排序,以及我做了一些花哨的切片来独立重新排列每一列。

我们可以避免 groupby 并让 numpy 完成整个工作吗?我还将利用 numba 及时编译到用njit加速一些事情

from numba import njit

@njit
def count_factor(f):
    c = np.arange(f.max() + 2) * 0
    for i in f:
        c[i + 1] += 1
    return c

@njit
def factor_fun(f):
    c = count_factor(f)
    cc = c[:-1].cumsum()
    return c[1:][f], cc[f]

def lexsort(a, f):
    n, m = a.shape
    f = f * (a.max() - a.min() + 1)
    return (f.reshape(-1, 1) + a).argsort(0)


def rnk_numba(df, gcols, rcols):
    tups = list(zip(*[df[c].values.tolist() for c in gcols]))
    f = pd.Series(tups).factorize()[0]
    a = lexsort(np.column_stack([df[c].values for c in rcols]), f)
    c, cc = factor_fun(f)
    c = c[:, None]
    cc = cc[:, None]
    n, m = a.shape
    r = np.arange(a.shape[1])
    b = np.empty_like(a)
    b[a, np.arange(m)[None, :]] = np.arange(n)[:, None]
    return pd.DataFrame((b - cc) / c, df.index, rcols).add_suffix('_ranked')

工作原理

老实说,这在心理上很难处理。我会继续扩展我上面解释的内容。 我想再次使用argsort 将排名降到正确的位置。但是,我必须与分组列抗衡。所以我要做的是编译tuples 和factorize 的列表,如this question here 中所述 现在我有一组分解的tuples,我可以执行修改后的lexsort,它在分解的tuple 组中排序。 This question addresses the lexsort. 还有一个棘手的问题有待解决,我必须根据每个组的大小来抵消新找到的排名,以便为每个组获得新的排名。这在下面代码中的小 sn-p b - cc 中得到了解决。但计算cc 是必要的组成部分。

这就是一些高级哲学。 @njit呢?

请注意,当我分解时,我将映射到整数0n - 1,其中n 是唯一分组tuples 的数量。我可以使用长度为 n 的数组作为跟踪计数的便捷方式。 为了完成groupby 偏移量,我需要跟踪这些组的位置的计数和累积计数,因为它们在tuples 列表或那些tuples 的分解版本中表示。我决定对分解数组f 进行线性扫描,并在numba 循环中计算观察结果。虽然我有这些信息,但我也会生成必要的信息来生成我还需要的累积偏移量。 numba 提供了一个接口来产生高效的编译函数。这很挑剔,您必须获得一些经验才能知道什么是可能的,什么是不可能的。我决定numbafy 两个函数,前面有一个numba 装饰器@njit。这种编码在没有这些装饰器的情况下也能正常工作,但使用它们会加快速度。

时机

%%timeit 
ranked_cols = [col + '_ranked' for col in to_rank]
​
ranked = df[['date_id', 'category'] + to_rank].groupby(['date_id', 'category'], as_index = False).apply(lambda x: rank_fun(x, to_rank)) 
ranked.columns = ranked_cols        
ranked.reset_index(inplace = True)
ranked.set_index('level_1', inplace = True)  
1 loop, best of 3: 481 ms per loop

gcols = ['date_id', 'category']
rcols = ['var_1', 'var_2', 'var_3']

%timeit df.groupby(gcols)[rcols].apply(rnk_numpy).add_suffix('_ranked')
100 loops, best of 3: 16.4 ms per loop

%timeit rnk_numba(df, gcols, rcols).head()
1000 loops, best of 3: 1.03 ms per loop

【讨论】:

感谢您,速度提高了约 26 倍!但它给我的代码提供了不同的等级,也不适合 nans?另外,我注意到您使用 .apply(rnk) 没有 lambda x:这是捷径吗?如果您准确解释 rnk( 的每一行如何工作以及您的代码在做什么,因为我的 numpy 非常弱,我们将不胜感激。 其他人注意:排名是相同的(除了nan问题),但我的输出排名排序不同,如果您比较索引,您可以看到。例如my_ranks.var_1_ranked / new_ranks.var_1_ranked = 100 我会在回到电脑前添加更多评论。另外,你想让 nan 怎么处理?没有等级或nan?排名最差? 谢谢。另外,使用 transform( 而不是 apply( 会更快吗?我认为这是经过优化的,因此可以消除一些开销。 到目前为止,我已经为我所做的工作添加了更多颜色。稍后我会联系nans。我已经包含了几个指向其他问题的链接,这些问题旨在为您提供更好的答案。请随时为这些问题以及那些在不知不觉中帮助你的人提供答案:-)【参考方案2】:

我建议你试试这个代码。比你的快 3 倍,更清晰。

排名函数:

def rank(x):
    counts = x.value_counts()
    bins = int(0 if len(counts) == 0 else x.count() / counts.iloc[0])
    bins = 100 if bins > 100 else bins
    if bins < 5:
        return x.apply(lambda x: 50)
    else:
        return (pd.qcut(x, bins, labels=False) * (100 / bins)).fillna(50).astype(int)

单线程应用:

for col in to_rank:
    df[col + '_ranked'] = df.groupby(['date_id', 'category'])[col].apply(rank)

多线程应用:

import sys
from multiprocessing import Pool

def tfunc(col):
    return df.groupby(['date_id', 'category'])[col].apply(rank)

pool = Pool(len(to_rank))
result = pool.map_async(tfunc, to_rank).get(sys.maxint)
for (col, val) in zip(to_rank, result):
    df[col + '_ranked'] = val

【讨论】:

谢谢,我收到一个 IndexError 错误,例如IndexError: index 30 is out of bounds for axis 0 with size 6 with pd.qcut(x, bins, labels=False) bins 需要为 qcut 的整数,因此修复此错误就像将函数的最后一行更改为返回 (pd.qcut(x, int(bins), labels=False) 有趣的是使用转换比你的循环慢! df.groupby(['date_id', 'category'])[to_rank].transform(rank) 1 个循环,最好的 3 个:每个循环 214 毫秒。知道为什么吗?这对我来说似乎违反直觉! 据我了解(不太好), .transform( 的全部意义在于您可以一次将多个列/系列传递给您的函数,然后每列独立循环,而使用 .apply(,列是单独传递的,这显然有额外的开销。在您的代码中,通过循环 to_rank,您正在为 to_rank 中的每一列执行新的 groupby 调用和 Series selection [col],但尽管如此,它仍然比变换快! 根据您的提醒,我更正了代码中的一些错误。并添加多线程示例代码,会快一点。

以上是关于通过用向量化替换 lambda x 来提高排名函数的性能的主要内容,如果未能解决你的问题,请参考以下文章

如何提高在 DolphinDB 中计算希腊语的性能?

向量化

匿名函数,向量化和预分配,函数的函数,P码文件

提高性能(矢量化?) pandas.groupby.aggregate

矢量化

lambda x: float(x[1:-1]) 这个函数是啥意思,python 3.5