PySpark Groupby 和接收特定列

Posted

技术标签:

【中文标题】PySpark Groupby 和接收特定列【英文标题】:PySpark Groupby and Receive Specific Columns 【发布时间】:2019-12-03 12:46:50 【问题描述】:

如果我有如下的数据框;

ProductId   StoreId Prediction      Index
24524       20      3               19
24524       20      5               20
24524       20      1               21
24524       20      2               22
24524       20      3               23
24524       20      1               24
24524       20      3               25
24524       20      4               26
24524       20      5               27
24524       20      6               28
24524       20      1               29
37654       23      8               9
37654       23      3               10
37654       23      4               11
37654       23      5               12
37654       23      6               13
37654       23      7               14
37654       23      8               15
37654       23      4               16
37654       23      2               17
37654       23      4               18
37654       23      3               19
37654       23      7               20
37654       23      7               21
37654       23      3               22
37654       23      2               23
37654       23      3               24

我想根据每个产品和商店对最后 7 个索引进行平均。

ProductId   StoreId Prediction(Average)
24524       20      3.28                #(This average is include Index 23, 24, 25, 26, 27, 28 and 29) 
37654       23      4.14                #(This average is include Index 18, 19, 20, 21, 22, 23 and 24) 

groupby应该怎么做?

df.groupBy(["ProductId","StoreId"]).agg('Prediction':'avg'))

你能帮我解决这个问题吗?

【问题讨论】:

【参考方案1】:

您可以使用PRECEDING当前行

>>> df2.registerTempTable("temp")

使用与last_value不同

>>> sql("select distinct  ProductId,StoreId,last_value(avg_spent_time) over(partition by ProductId,StoreId order by ProductId,StoreId) as result  from (select ProductId,StoreId,avg(Prediction) over(order by ProductId ROWS BETWEEN 6 PRECEDING AND CURRENT ROW) as avg_spent_time from temp) t").show()
+---------+-------+------------------+
|ProductId|StoreId|            result|
+---------+-------+------------------+
|    24524|     20|3.2857142857142856|
|    37654|     23| 4.142857142857143|
+---------+-------+------------------+

【讨论】:

【参考方案2】:

可以通过Window函数来完成:

from pyspark.sql.window import Window
import pyspark.sql.functions as f

# create a Window function
col_list = ['ProductId', 'StoreId']
window = Window.partitionBy([col(x) for x in col_list]).orderBy(df['Index'].desc())

# select last 7 rows per partitions
df = df.select('*', rank().over(window).alias('rank')).filter(col('rank') <= 7).drop('rank')
# calculate average
df.groupBy(["ProductId","StoreId"]).agg(f.avg(f.col("Prediction"))).show()

+---------+-------+------------------+
|ProductId|StoreId|   avg(Prediction)|
+---------+-------+------------------+
|    37654|     23| 4.142857142857143|
|    24524|     20|3.2857142857142856|
+---------+-------+------------------+

【讨论】:

以上是关于PySpark Groupby 和接收特定列的主要内容,如果未能解决你的问题,请参考以下文章

Groupby 和 collect_list 基于 PySpark 中的另一列维护顺序

如何将 groupBy 和聚合函数应用于 PySpark DataFrame 中的特定窗口?

检查一列是不是与pyspark中的groupby连续

当percentile_approx基于groupby返回特定列的单个值时,如何选择另一列的对应值?

pyspark groupby 并创建包含其他列字典的列

在 groupby 操作 PySpark 中聚合列中的稀疏向量