如何使用 PySpark 逐年获得百分比变化
Posted
技术标签:
【中文标题】如何使用 PySpark 逐年获得百分比变化【英文标题】:How to get percent change year over year by group with PySpark 【发布时间】:2021-11-25 15:55:32 【问题描述】:Pyspark 初学者并尝试通过对产品进行分组来计算产品数量的同比百分比变化。
我有这个数据框
prod_desc year prod_count
0 product_a 2019 53
1 product_b 2019 44
2 product_c 2019 36
3 product_a 2020 52
4 product_b 2020 43
5 product_c 2020 42
需要的输出:
prod_desc year prod_count Percentage change
0 product_a 2019 53 NaN
1 product_b 2019 44 NaN
2 product_c 2019 36 NaN
3 product_a 2020 52 -1.88
4 product_b 2020 43 -2.27
5 product_c 2020 42 16.60
我可以在 python 中使用以下逻辑来做到这一点 pandas 需要使用 pyspark 来实现同样的目标
ds['percentage change'] = ds.sort_values('year').groupby(['prod_desc']).agg('prod_count':'pct_change')
任何帮助将不胜感激
【问题讨论】:
【参考方案1】:你的 DF:
df = spark.createDataFrame(
[
(0, 'product_a', 2019, 53)
,(1, 'product_b', 2019, 44)
,(2, 'product_c', 2019, 36)
,(3, 'product_a', 2020, 52)
,(4, 'product_b', 2020, 43)
,(5, 'product_c', 2020, 42)
], ['id', 'prod_desc', 'year', 'prod_count']
)
您可以使用带有滞后函数的窗口函数:
from pyspark.sql.window import Window
import pyspark.sql.functions as F
W = Window.partitionBy("prod_desc").orderBy('year')
df.withColumn('prod_count_y-1',F.lag(df['prod_count']).over(W))\
.withColumn('var %', F.round((F.col('prod_count')/F.col('prod_count_y-1') -1)*100,2))\
.show()
+---+---------+----+----------+--------------+-----+
| id|prod_desc|year|prod_count|prod_count_y-1|var %|
+---+---------+----+----------+--------------+-----+
| 0|product_a|2019| 53| null| null|
| 3|product_a|2020| 52| 53|-1.89|
| 1|product_b|2019| 44| null| null|
| 4|product_b|2020| 43| 44|-2.27|
| 2|product_c|2019| 36| null| null|
| 5|product_c|2020| 42| 36|16.67|
+---+---------+----+----------+--------------+-----+
【讨论】:
【参考方案2】:这是一个使用Window
和lag
的示例来跟踪先前的值并计算百分比变化:
from pyspark.sql import SparkSession, Window
from pyspark.sql import functions as F
spark = SparkSession.builder.getOrCreate()
data = [
"prod_desc": "product_a", "year": 2019, "prod_count": 53,
"prod_desc": "product_b", "year": 2019, "prod_count": 44,
"prod_desc": "product_c", "year": 2019, "prod_count": 36,
"prod_desc": "product_a", "year": 2020, "prod_count": 52,
"prod_desc": "product_b", "year": 2020, "prod_count": 43,
"prod_desc": "product_c", "year": 2020, "prod_count": 42,
]
df = spark.createDataFrame(data)
window = Window.partitionBy("prod_desc").orderBy("year")
df = df.withColumn("prev_value", F.lag(df.prod_count).over(window))
df = (
df.withColumn(
"Percentage change",
F.when(F.isnull(df.prod_count - df.prev_value), None).otherwise(
(df.prod_count - df.prev_value) * 100 / df.prev_value
),
)
.drop("prev_value")
.orderBy("year", "prod_desc")
)
结果:
+----------+---------+----+-------------------+
|prod_count|prod_desc|year|Percentage change |
+----------+---------+----+-------------------+
|53 |product_a|2019|null |
|44 |product_b|2019|null |
|36 |product_c|2019|null |
|52 |product_a|2020|-1.8867924528301887|
|43 |product_b|2020|-2.272727272727273 |
|42 |product_c|2020|16.666666666666668 |
+----------+---------+----+-------------------+
【讨论】:
以上是关于如何使用 PySpark 逐年获得百分比变化的主要内容,如果未能解决你的问题,请参考以下文章