PySpark Groupby 和接收特定列
Posted
技术标签:
【中文标题】PySpark Groupby 和接收特定列【英文标题】:PySpark Groupby and Receive Specific Columns 【发布时间】:2019-12-03 12:46:50 【问题描述】:如果我有如下的数据框;
ProductId StoreId Prediction Index
24524 20 3 19
24524 20 5 20
24524 20 1 21
24524 20 2 22
24524 20 3 23
24524 20 1 24
24524 20 3 25
24524 20 4 26
24524 20 5 27
24524 20 6 28
24524 20 1 29
37654 23 8 9
37654 23 3 10
37654 23 4 11
37654 23 5 12
37654 23 6 13
37654 23 7 14
37654 23 8 15
37654 23 4 16
37654 23 2 17
37654 23 4 18
37654 23 3 19
37654 23 7 20
37654 23 7 21
37654 23 3 22
37654 23 2 23
37654 23 3 24
我想根据每个产品和商店对最后 7 个索引进行平均。
ProductId StoreId Prediction(Average)
24524 20 3.28 #(This average is include Index 23, 24, 25, 26, 27, 28 and 29)
37654 23 4.14 #(This average is include Index 18, 19, 20, 21, 22, 23 and 24)
groupby应该怎么做?
df.groupBy(["ProductId","StoreId"]).agg('Prediction':'avg'))
你能帮我解决这个问题吗?
【问题讨论】:
【参考方案1】:您可以使用PRECEDING和当前行
>>> df2.registerTempTable("temp")
使用与last_value不同
>>> sql("select distinct ProductId,StoreId,last_value(avg_spent_time) over(partition by ProductId,StoreId order by ProductId,StoreId) as result from (select ProductId,StoreId,avg(Prediction) over(order by ProductId ROWS BETWEEN 6 PRECEDING AND CURRENT ROW) as avg_spent_time from temp) t").show()
+---------+-------+------------------+
|ProductId|StoreId| result|
+---------+-------+------------------+
| 24524| 20|3.2857142857142856|
| 37654| 23| 4.142857142857143|
+---------+-------+------------------+
【讨论】:
【参考方案2】:可以通过Window函数来完成:
from pyspark.sql.window import Window
import pyspark.sql.functions as f
# create a Window function
col_list = ['ProductId', 'StoreId']
window = Window.partitionBy([col(x) for x in col_list]).orderBy(df['Index'].desc())
# select last 7 rows per partitions
df = df.select('*', rank().over(window).alias('rank')).filter(col('rank') <= 7).drop('rank')
# calculate average
df.groupBy(["ProductId","StoreId"]).agg(f.avg(f.col("Prediction"))).show()
+---------+-------+------------------+
|ProductId|StoreId| avg(Prediction)|
+---------+-------+------------------+
| 37654| 23| 4.142857142857143|
| 24524| 20|3.2857142857142856|
+---------+-------+------------------+
【讨论】:
以上是关于PySpark Groupby 和接收特定列的主要内容,如果未能解决你的问题,请参考以下文章
Groupby 和 collect_list 基于 PySpark 中的另一列维护顺序
如何将 groupBy 和聚合函数应用于 PySpark DataFrame 中的特定窗口?