如何加快遍历数组/矩阵的速度?尝试了 pandas 和 numpy 数组

Posted

技术标签:

【中文标题】如何加快遍历数组/矩阵的速度?尝试了 pandas 和 numpy 数组【英文标题】:How to speed up iterating through an array / matrix? Tried both pandas and numpy arrays 【发布时间】:2018-08-16 18:05:35 【问题描述】:

我想通过一个大型二维数组(15,100m)做一些特征丰富。

处理包含 100,000 条记录的样本集表明我需要更快地完成此操作。

编辑(数据模型信息)

为了简化,假设我们只有两个相关的列:

IP(标识符) Unix(自 1970 年以来的时间戳,以秒为单位)

我想添加第三列,计算该 IP 在过去 12 小时内出现的次数。

结束编辑

我的第一次尝试是使用 pandas,因为它使用命名维度很舒服,但是太慢了:

for index,row in tqdm_notebook(myData.iterrows(),desc='iterrows'):
# how many times was the IP address (and specific device) around in the prior 5h?
    hours = 12
    seen = myData[(myData['ip']==row['ip'])
                 &(myData['device']==row['device'])
                 &(myData['os']==row['os'])
                 &(myData['unix']<row['unix'])
                 &(myData['unix']>(row['unix']-(60*60*hours)))].shape[0]
    ip_seen = myData[(myData['ip']==row['ip'])
                 &(myData['unix']<row['unix'])
                 &(myData['unix']>(row['unix']-(60*60*hours)))].shape[0]
    myData.loc[index,'seen'] = seen
    myData.loc[index,'ip_seen'] = ip_seen

然后我切换到 numpy 数组并希望得到更好的结果,但是运行完整数据集仍然太慢:

# speed test numpy arrays
for i in np.arange(myArray.shape[0]):
    hours = 12
    ip,device,os,ts = myArray[i,[0,3,4,12]]
    ip_seen = myArray[(np.where((myArray[:,0]==ip) 
                            & (myArray[:,12]<ts)
                            & (myArray[:,12]>(ts-60*60*hours) )))].shape[0]
    device_seen = myArray[(np.where((myArray[:,0]==ip) 
                            & (myArray[:,2] == device)
                            & (myArray[:,3] == os)
                            & (myArray[:,12]<ts)
                            & (myArray[:,12]>(ts-60*60*hours) )))].shape[0]
    myArray[i,13]=ip_seen
    myArray[i,14]=device_seen

我的下一个想法是只迭代一次,并保持一个不断增长的当前计数字典,而不是在每次迭代中向后看。

但这会有其他一些缺点(例如,如何跟踪何时减少 12 小时窗口外的观察计数)。

您将如何解决这个问题?

是否可以选择使用低级 Tensorflow 函数来涉及 GPU?

谢谢

【问题讨论】:

您能否提供您的数据样本以及该样本的预期输出? 在 python 中,无论您使用哪个库,迭代行都会很慢。您的数据库是哪种格式?如果您的数据库很大,那么 pandas 可能不是操作它的最佳选择。为什么不直接使用 SQL? @jdehesa,请参阅上面更新的数据模型帖子(简化以说明这个想法)。 @Imanol,DB 无关紧要,现在这一切都发生在内存中,对于完整的数据集,如果太大,它也可以分批运行。 SQL 不是为频繁重新获取的“滚动”更新而设计的。我认为这会更慢,因为它会一直从磁盘重新读取。 如果将时间设置为索引,则可以使用DataFrame.rolling。请提供一些示例数据,以便我们测试一些可能的解决方案 【参考方案1】:

正如@jdehesa 的评论中提到的,我采用了另一种方法,它只允许我对整个数据集进行一次迭代并从索引中提取(衰减的)权重。

decay_window = 60*60*12 # every 12
decay = 0.5 # fall by 50% every window
ip_idx = pd.DataFrame(myData.ip.unique())
ip_idx['ts_seen'] = 0
ip_idx['ip_seen'] = 0
ip_idx.columns = ['ip','ts_seen','ip_seen']
ip_idx.set_index('ip',inplace=True)

for index, row in myData.iterrows(): # all
    # How often was this IP seen?
    prior_ip_seen = ip_idx.loc[(row['ip'],'ip_seen')]
    prior_ts_seen = ip_idx.loc[(row['ip'],'ts_seen')]
    delay_since_count = row['unix']-ip_idx.loc[(row['ip'],'ts_seen')]
    new_ip_seen = prior_ip_seen*decay**(delay_since_count/decay_window)+1
    ip_idx.loc[(row['ip'],'ip_seen')] = new_ip_seen
    ip_idx.loc[(row['ip'],'ts_seen')] = row['unix']
    myData.iloc[index,14] = new_ip_seen-1

这样一来,结果就不是最初要求的固定时间窗口,而是先前的观察随着时间的推移“淡出”,从而使最近的频繁观察具有更高的权重。

此功能比最初计划的简化(结果更昂贵)方法包含更多信息。

感谢您的意见!

编辑

与此同时,我为相同的操作切换到 numpy 数组,现在只需要一小部分时间(在

以防万一有人寻找起点:

%%time
import sys
## temporary lookup
ip_seen_ts = [0]*365000
ip_seen_count = [0]*365000
cnt = 0
window = 60*60*12 # 12h
decay = 0.5
counter = 0
chunksize = 10000000
store = pd.HDFStore('store.h5')
t = time.process_time()
try:
    store.remove('myCount')
except:
    print("myData not present.")
for myHdfData in store.select_as_multiple(['myData','myFeatures'],columns=['ip','unix','ip_seen'],chunksize=chunksize):
    print(counter, time.process_time() - t)
    #display(myHdfData.head(5))
    counter+=chunksize
    t = time.process_time()
    sys.stdout.flush()
    keep_index = myHdfData.index.values
    myArray = myHdfData.as_matrix()
    for row in myArray[:,:]:
        #for row in myArray:
        i = (row[0].astype('uint32')) # IP as identifier
        u = (row[1].astype('uint32')) # timestamp
        try:
            delay = u - ip_seen_ts[i]
        except:
            delay = 0
        ip_seen_ts[i] = u
        try:
            ip_seen_count[i] = ip_seen_count[i]*decay**(delay/window)+1
        except:
            ip_seen_count[i] = 1
        row[3] = np.tanh(ip_seen_count[i]-1) # tanh to normalize between 0 and 1
    myArrayAsDF = pd.DataFrame(myArray,columns=['c_ip','c_unix','c_ip2','ip_seen'])
    myArrayAsDF.set_index(keep_index,inplace=True)
    store.append('myCount',myArrayAsDF)
store.close()

【讨论】:

【参考方案2】:

加快速度的唯一方法是循环。在您的情况下,您可以尝试将 rolling 与您想要的时间跨度窗口一起使用,使用 Unix 时间戳作为日期时间索引(假设记录按时间戳排序,否则您需要先排序)。这应该适用于ip_seen

ip = myData['ip']
ip.index = pd.to_datetime(myData['unix'], unit='s')
myData['ip_seen'] = ip.rolling('5h')
    .agg(lambda w: np.count_nonzero(w[:-1] == w[-1]))
    .values.astype(np.int32)

但是,当聚合涉及多个列时,例如在seen 列中,它会变得更加复杂。目前(参见Pandas issue #15095)滚动函数不支持跨越两个维度的聚合。一种解决方法可能是将感兴趣的列合并到一个新系列中,例如一个元组(如果值是数字,这可能会更好)或字符串(如果值已经是字符串,这可能会更好)。例如:

criteria = myData['ip'] + '|' + myData['device'] + '|' + myData['os']
criteria.index = pd.to_datetime(myData['unix'], unit='s')
myData['seen'] = criteria.rolling('5h')
    .agg(lambda w: np.count_nonzero(w[:-1] == w[-1]))
    .values.astype(np.int32)

编辑

显然rolling 仅适用于数字类型,因此有两个选项:

    处理数据以使用数字类型。对于 IP,这很容易,因为它实际上代表一个 32 位数字(如果我猜是 IPv6,则为 64)。对于设备和操作系统,假设它们现在是字符串,它会变得更加复杂,您必须将每个可能的值映射到一个整数并将其与 IP 合并为一个长值,例如将它们放在更高的位或类似的位置(使用 IPv6 甚至是不可能的,因为 NumPy 目前支持的最大整数是 64 位)。

    翻转myData的索引(现在应该是不是日期时间,因为rolling也不能使用它)并使用索引窗口获取必要的数据并进行操作:

    # Use sequential integer index
    idx_orig = myData.index
    myData.reset_index(drop=True, inplace=True)
    # Index to roll
    idx = pd.Series(myData.index)
    idx.index = pd.to_datetime(myData['unix'], unit='s')
    # Roll aggregation function
    def agg_seen(w, data, fields):
        # Use slice for faster data frame slicing
        slc = slice(int(w[0]), int(w[-2])) if len(w) > 1 else []
        match = data.loc[slc, fields] == data.loc[int(w[-1]), fields]
        return np.count_nonzero(np.all(match, axis=1))
    # Do rolling
    myData['ip_seen'] = idx.rolling('5h') \
        .agg(lambda w: agg_seen(w, myData, ['ip'])) \
        .values.astype(np.int32)
    myData['ip'] = idx.rolling('5h') \
        .agg(lambda w: agg_seen(w, myData, ['ip', 'device', 'os'])) \
        .values.astype(np.int32)
    # Put index back
    myData.index = idx_orig
    

    这不是 rolling 的用途,不过,我不确定这是否比循环提供更好的性能。

【讨论】:

太好了,我不知道滚动可以处理可变时间窗口。会试一试并恢复。谢谢! @szeta 我注意到rolling 不适用于字符串,它似乎跳过了每个非数字值,我正在寻找替代路径。 @szeta 我已经添加了一些使用 rolling 的东西,它适用于字符串,虽然它看起来很慢,不确定它是否比你的原始代码更好...... @szeta 顺便说一句,您是在当前记录之前 的时间窗口中寻找模式重复还是包括它? (也就是说,由于计算当前行,所有结果都应该至少为 1 吗?) 我一直在玩滚动功能,但没有任何明显的改进。在我们的讨论之后,我意识到它可能更容易(甚至更好)而不是滚动计数窗口,而是为观察之间的计数建立半衰期。还没来得及写代码,但一旦我有有用的东西就会在这里分享。

以上是关于如何加快遍历数组/矩阵的速度?尝试了 pandas 和 numpy 数组的主要内容,如果未能解决你的问题,请参考以下文章

Python: Pandas - 嵌套循环需要很长时间才能完成。如何加快速度?

对 pandas 数据框的索引查找。为何这么慢?如何加快速度? [复制]

逐元素矩阵乘法:R 与 Rcpp(如何加快此代码的速度?)

加速python中的元素数组乘法

如何使用 PANDAS / Python 将矩阵转换为列数组

如何使用熊猫来加快这个嵌套循环的速度?