在 Pyspark 中查找给定时间窗口中的行数

Posted

技术标签:

【中文标题】在 Pyspark 中查找给定时间窗口中的行数【英文标题】:FInd number of rows in given time window in Pyspark 【发布时间】:2019-11-06 19:14:41 【问题描述】:

我有一个 PySpark 数据框,其中一小部分如下所示:

+------+-----+-------------------+-----+
|  name| type|          timestamp|score|
+------+-----+-------------------+-----+
| name1|type1|2012-01-10 00:00:00|   11|
| name1|type1|2012-01-10 00:00:10|   14|
| name1|type1|2012-01-10 00:00:20|    2|
| name1|type1|2012-01-10 00:00:30|    3|
| name1|type1|2012-01-10 00:00:40|   55|
| name1|type1|2012-01-10 00:00:50|   10|
| name5|type1|2012-01-10 00:01:00|    5|
| name2|type2|2012-01-10 00:01:10|    8|
| name5|type1|2012-01-10 00:01:20|    1|
|name10|type1|2012-01-10 00:01:30|   12|
|name11|type3|2012-01-10 00:01:40|  512|
+------+-----+-------------------+-----+

对于选定的时间窗口(例如,假设5 days),我想找出每个name 有多少score(例如num_values_week)值。也就是说,name12012-01-10 - 2012-01-14 之间,然后在 2012-01-15 - 2012-01-29 之间有多少 score 值等等(对于所有其他名称也是如此,例如 name2 等等。)

我想将此信息转换到新的 PySpark 数据框中,该数据框中将包含 nametypenum_values_week 列。我该怎么做?

在我之前问过的similar question 中,我看到了当一个人选择一周的间隔时如何获得(分数)计数。但是,在这个问题中,我想知道当一个人在时间窗口中选择任何可调整的值(如5 days 左右)时如何获得分数。

任何帮助将不胜感激。

【问题讨论】:

【参考方案1】:

pd.GrouperDataFrame.groupy 一起使用:

#df['timestamp']=pd.to_datetime(df['timestamp']) #to convert to datetime
new_df=( df.groupby([pd.Grouper(key='timestamp',freq='5D'),'name'],sort=False)
          .score
          .count()
          .rename('num_values_week')
          .reset_index() )
print(new_df)

输出

   timestamp    name  num_values_week
0 2012-01-10   name1                6
1 2012-01-10   name5                2
2 2012-01-10   name2                1
3 2012-01-10  name10                1
4 2012-01-10  name11                1

GroupBy.resample:

new_df=( df.groupby('name',sort=False)
           .resample('5D',on='timestamp')
           .count()
           .score
           .rename('num_values_week')
           .reset_index() )
print(new_df)

输出

     name  timestamp  num_values_week
0   name1 2012-01-10                6
1   name5 2012-01-10                2
2   name2 2012-01-10                1
3  name10 2012-01-10                1
4  name11 2012-01-10                1

如果您想在原始 df 中创建一个新列,请使用 transform:

df['num_values_week']=df.groupby([pd.Grouper(key='timestamp',freq='5D'),'name']).score.transform('count')
print(df)

      name   type           timestamp  score  num_values_week
0    name1  type1 2012-01-10 00:00:00     11                6
1    name1  type1 2012-01-10 00:00:10     14                6
2    name1  type1 2012-01-10 00:00:20      2                6
3    name1  type1 2012-01-10 00:00:30      3                6
4    name1  type1 2012-01-10 00:00:40     55                6
5    name1  type1 2012-01-10 00:00:50     10                6
6    name5  type1 2012-01-10 00:01:00      5                2
7    name2  type2 2012-01-10 00:01:10      8                1
8    name5  type1 2012-01-10 00:01:20      1                2
9   name10  type1 2012-01-10 00:01:30     12                1
10  name11  type3 2012-01-10 00:01:40    512                1

【讨论】:

以上是关于在 Pyspark 中查找给定时间窗口中的行数的主要内容,如果未能解决你的问题,请参考以下文章

如何在不使用count(*)的情况下在greenplum中查找表中的行数

如何在 pyspark 中查找不合规的行

30 天滚动窗口中的行数

SQL/PySpark:创建一个包含过去 n 天的行数的新列

如何查找 MinMaxScaler 对象中的行数和列数?

从列表中创建一个 pyspark 数据框列,其中列表的长度与数据框的行数相同