在执行一些额外操作的同时将数据帧重新采样为新数据帧

Posted

技术标签:

【中文标题】在执行一些额外操作的同时将数据帧重新采样为新数据帧【英文标题】:Resampling a dataframe into a new one while doing some additional operations 【发布时间】:2019-11-01 04:32:33 【问题描述】:

我正在使用一个数据框,其中每个条目(行)都带有开始时间、持续时间和其他属性。我想从这个数据框创建一个新的数据框,我会将每个条目从原始条目转换为 15 分钟的间隔,同时保持所有其他属性相同。旧数据帧中每个条目的新数据帧中的条目数量将取决于原始数据帧的实际持续时间。

起初我尝试使用 pd.resample,但它并没有完全达到我的预期。然后,我使用itertuples() 构建了一个运行良好的函数,但它花了大约半小时,数据帧大约有 3000 行。现在我想对 200 万行做同样的事情,所以我正在寻找其他可能性。

假设我有以下数据框:

testdict = 'start':['2018-01-05 11:48:00', '2018-05-04 09:05:00', '2018-08-09 07:15:00', '2018-09-27 15:00:00'], 'duration':[22,8,35,2], 'Attribute_A':['abc', 'def', 'hij', 'klm'], 'id': [1,2,3,4]
testdf = pd.DataFrame(testdict)
testdf.loc[:,['start']] = pd.to_datetime(testdf['start'])
print(testdf)

>>>testdf
                 start  duration Attribute_A  id
0  2018-01-05 11:48:00        22         abc   1
1  2018-05-04 09:05:00         8         def   2
2  2018-08-09 07:15:00        35         hij   3
3  2018-09-27 15:00:00         2         klm   4

我希望我的结果如下所示:

>>>resultdf
                start  duration Attribute_A  id
0 2018-01-05 11:45:00        12         abc   1
1 2018-01-05 12:00:00        10         abc   1
2 2018-05-04 09:00:00         8         def   2
3 2018-08-09 07:15:00        15         hij   3
4 2018-08-09 07:30:00        15         hij   3
5 2018-08-09 07:45:00         5         hij   3
6 2018-09-27 15:00:00         2         klm   4

这是我用 itertuples 构建的函数,它产生了所需的结果(我在上面显示的那个):

def min15_divider(df,newdf):
for row in df.itertuples():
    orig_min = row.start.minute
    remains = orig_min % 15 # Check if it is already a multiple of 15
    if remains == 0:
        new_time = row.start.replace(second=0)
        if row.duration < 15: # if it shorter than 15 min just use that for the duration
            to_append = 'start': new_time, 'Attribute_A': row.Attribute_A,
                         'duration': row.duration, 'id':row.id
            newdf = newdf.append(to_append, ignore_index=True)
        else: # if not, divide that in 15 min intervals until duration is exceeded
            cumu_dur = 15
            while cumu_dur < row.duration:
                to_append = 'start': new_time, 'Attribute_A': row.Attribute_A, 'id':row.id
                if cumu_dur < 15:
                    to_append['duration'] = cumu_dur
                else:
                    to_append['duration'] = 15
                new_time = new_time + pd.Timedelta('15 minutes')
                cumu_dur = cumu_dur + 15
                newdf = newdf.append(to_append, ignore_index=True)

            else: # add the remainder in the last 15 min interval
                final_dur = row.duration - (cumu_dur - 15)
                to_append = 'start': new_time, 'Attribute_A': row.Attribute_A,'duration': final_dur, 'id':row.id
                newdf = newdf.append(to_append, ignore_index=True)

    else: # When it is not an exact multiple of 15 min
        new_min = orig_min - remains # convert to multiple of 15
        new_time = row.start.replace(minute=new_min)
        new_time = new_time.replace(second=0)
        cumu_dur = 15 - remains # remaining minutes in the initial interval
        while cumu_dur < row.duration: # divide total in 15 min intervals until duration is exceeded
            to_append = 'start': new_time, 'Attribute_A': row.Attribute_A, 'id':row.id
            if cumu_dur < 15:
                to_append['duration'] = cumu_dur
            else:
                to_append['duration'] = 15

            new_time = new_time + pd.Timedelta('15 minutes')
            cumu_dur = cumu_dur + 15
            newdf = newdf.append(to_append, ignore_index=True)

        else: # when we reach the last interval or the starting duration was less than the remaining minutes
            if row.duration < 15:
                final_dur = row.duration # original duration less than remaining minutes in first interval
            else:
                final_dur = row.duration - (cumu_dur - 15) # remaining duration in last interval
            to_append = 'start': new_time, 'Attribute_A': row.Attribute_A, 'duration': final_dur, 'id':row.id
            newdf = newdf.append(to_append, ignore_index=True)
return newdf

有没有其他不使用itertuples 的方法可以节省我一些时间?

提前致谢。

附言。对于我的帖子中可能看起来有点奇怪的任何内容,我深表歉意,因为这是我第一次在 *** 中自己提出问题。

编辑

许多条目可以有相同的开始时间,所以.groupby 'start' 可能有问题。但是,每个条目都有一个具有唯一值的列,简称为“id”。

【问题讨论】:

11:48:00什么情况下需要回溯原时间? 它会被视为在 11:45 时间步开始,并且由于它持续的时间比该时间间隔内的剩余分钟长,它会延长到下一个 (12:00)。这就是为什么在结果数据框中,该条目被分为一个开始时间为 11:45(持续时间为 12 分钟)的条目和另一个开始时间为 12 的条目(持续时间为剩余 10 分钟)。 是否有重叠时间?如果时间重叠会怎样? 在重叠时间的情况下,'start' 会有多个具有相同值的条目,但它们对于其余属性会有不同的值(在此示例中,我只显示了 'Attribute_A' 但还有更多)。 @BlueSombrero 除了第一个 11:48:00 之外的所有日期时间值,它们都可以被 15 分钟整除,还是有更多随机值? 【参考方案1】:

使用pd.resample 是个好主意,但由于每行只有开始时间,因此需要先构建结束行才能使用它。

下面的代码假定'start' 列中的每个开始时间都是唯一的,因此grouby 可以以一种不寻常的方式使用,因为它只会提取一行。 我使用groupby,因为它会自动重新组合apply 使用的自定义函数生成的数据帧。 另请注意,'duration' 列会在几分钟内转换为 timedelta,以便稍后更好地执行一些数学运算。

import pandas as pd

testdict = 'start':['2018-01-05 11:48:00', '2018-05-04 09:05:00', '2018-08-09 07:15:00', '2018-09-27 15:00:00'], 'duration':[22,8,35,2], 'Attribute_A':['abc', 'def', 'hij', 'klm']
testdf = pd.DataFrame(testdict)
testdf['start'] = pd.to_datetime(testdf['start'])
testdf['duration'] = pd.to_timedelta(testdf['duration'], 'T')
print(testdf)

def calcduration(df, starttime):
    if len(df) == 1:
        return
    elif len(df) == 2:
        df['duration'].iloc[0] = pd.Timedelta(15, 'T') - (starttime - df.index[0])
        df['duration'].iloc[1] = df['duration'].iloc[1] - df['duration'].iloc[0]
    elif len(df) > 2:
        df['duration'].iloc[0] = pd.Timedelta(15, 'T') - (starttime - df.index[0])
        df['duration'].iloc[1:-1] = pd.Timedelta(15, 'T')
        df['duration'].iloc[-1] = df['duration'].iloc[-1] - df['duration'].iloc[:-1].sum()

def expandtime(x):
    frow = x.copy()
    frow['start'] = frow['start'] + frow['duration']
    gdf = pd.concat([x, frow], axis=0)
    gdf = gdf.set_index('start')
    resdf = gdf.resample('15T').nearest()
    calcduration(resdf, x['start'].iloc[0])
    return resdf

findf = testdf.groupby('start', as_index=False).apply(expandtime)
print(findf)

这段代码产生:

                      duration Attribute_A
  start                                   
0 2018-01-05 11:45:00 00:12:00         abc
  2018-01-05 12:00:00 00:10:00         abc
1 2018-05-04 09:00:00 00:08:00         def
2 2018-08-09 07:15:00 00:15:00         hij
  2018-08-09 07:30:00 00:15:00         hij
  2018-08-09 07:45:00 00:05:00         hij
3 2018-09-27 15:00:00 00:02:00         klm

一点解释

expandtime 是第一个自定义函数。它需要一行的数据框(因为我们假设'start' 值是唯一的),构建第二行,其'start' 等于第一行的'start' + 持续时间,然后使用resample 对其进行采样时间间隔为 15 分钟。所有其他列的值都是重复的。

calcduration 用于对列'duration' 进行一些数学运算,以计算每行的正确持续时间。

【讨论】:

这非常有效!我无法相信它与我之前编写的函数(问题本身中的函数)相比有多快。我只做了一次调整:由于'start' 值不是唯一的,我使用groupby()'start' 以及每行唯一的另一个属性。我仍然不太明白你为什么首先使用groupby()。你能再给我解释一下,以便我将来可以做更多这样的事情吗?非常感谢你! @华伦天奴 构造groupby(colname).apply(func) 允许将func 应用于groupby 标识的每个子集。在这种特殊情况下,子集必须是单行。它会自动将所有对func 的调用返回的详细子集重新组合到一个数据帧中。它是使用itertuples() 循环遍历行并将结果附加到newdf 的替代方法。通常更快,因为一切都由 pandas 内部完成。【参考方案2】:

所以,从你的 df 开始:

testdict = 'start':['2018-01-05 11:48:00', '2018-05-04 09:05:00', '2018-08-09 07:15:00', '2018-09-27 15:00:00'], 'duration':[22,8,35,2], 'Attribute_A':['abc', 'def', 'hij', 'klm']
df = pd.DataFrame(testdict)
df.loc[:,['start']] = pd.to_datetime(df['start'])
print(df)

首先计算每一行的结束时间:

df['dur'] = pd.to_timedelta(df['duration'], unit='m')
df['end'] = df['start'] + df['dur']

然后创建两个新列来保存固定间隔(15 分钟)的开始和结束日期:

df['start15'] = df['start'].dt.floor('15min')
df['end15'] = df['end'].dt.floor('15min')

此时,数据框如下所示:

  Attribute_A  duration               start      dur                 end start15               end15
0         abc        22 2018-01-05 11:48:00 00:22:00 2018-01-05 12:10:00 2018-01-05 11:45:00 2018-01-05 12:00:00  
1         def         8 2018-05-04 09:05:00 00:08:00 2018-05-04 09:13:00 2018-05-04 09:00:00 2018-05-04 09:00:00     
2         hij        35 2018-08-09 07:15:00 00:35:00 2018-08-09 07:50:00 2018-08-09 07:15:00 2018-08-09 07:45:00   
3         klm         2 2018-09-27 15:00:00 00:02:00 2018-09-27 15:02:00 2018-09-27 15:00:00 2018-09-27 15:00:00 

start15end15 列组合在一起具有正确的时间,但您需要将它们合并:

df = pd.melt(df, ['dur', 'start', 'Attribute_A', 'end'], ['start15', 'end15'], value_name='start15')
df = df.drop('variable', 1).drop_duplicates('start15').sort_values('start15').set_index('start15')

输出:

                         dur               start Attribute_A
start15                                                     
2018-01-05 11:45:00 00:22:00 2018-01-05 11:48:00         abc
2018-01-05 12:00:00 00:22:00 2018-01-05 11:48:00         abc
2018-05-04 09:00:00 00:08:00 2018-05-04 09:05:00         def
2018-08-09 07:15:00 00:35:00 2018-08-09 07:15:00         hij
2018-08-09 07:45:00 00:35:00 2018-08-09 07:15:00         hij
2018-09-27 15:00:00 00:02:00 2018-09-27 15:00:00         klm

看起来不错,但缺少 2018-08-09 07:30:00 行。使用 groupby 填充此行和任何其他缺失的行并重新采样:

df = df.groupby('start').resample('15min').ffill().reset_index(0, drop=True).reset_index()

取回end15 列,它在之前的熔化操作中被丢弃:

df['end15'] = df['end'].dt.floor('15min')

然后计算每一行的正确持续时间。我将其分为两个计算(跨越多个时间步长的持续时间,以及不跨越多个时间步长的持续时间)以保持可读性:

df.loc[df['start15'] != df['end15'], 'duration'] = np.minimum(df['end15'] - df['start'], pd.Timedelta('15min').to_timedelta64())
df.loc[df['start15'] == df['end15'], 'duration'] = np.minimum(df['end'] - df['end15'], df['end'] - df['start'])

然后进行一些清理以使其看起来像您想要的那样:

df['duration'] = (df['duration'].dt.seconds/60).astype(int)
print(df)
df = df[['start15', 'duration', 'Attribute_A']].copy()

结果:

              start15  duration Attribute_A
0 2018-01-05 11:45:00        12         abc
1 2018-01-05 12:00:00        10         abc
2 2018-05-04 09:00:00         8         def
3 2018-08-09 07:15:00        15         hij
4 2018-08-09 07:30:00        15         hij
5 2018-08-09 07:45:00         5         hij
6 2018-09-27 15:00:00         2         klm

请注意,此答案的部分内容基于this answer

【讨论】:

以上是关于在执行一些额外操作的同时将数据帧重新采样为新数据帧的主要内容,如果未能解决你的问题,请参考以下文章

如何将数据帧切片转换为新数据帧

每小时重新采样数据帧

将数据帧重新采样为具有任意期末月份的 n 个月期间

pyspark:在日期和时间上重新采样 pyspark 数据帧

重新采样并附加到相同的数据帧

如何计算熊猫中重新采样的多索引数据帧