如何使用 PySpark 逐年获得百分比变化

Posted

技术标签:

【中文标题】如何使用 PySpark 逐年获得百分比变化【英文标题】:How to get percent change year over year by group with PySpark 【发布时间】:2021-11-25 15:55:32 【问题描述】:

Pyspark 初学者并尝试通过对产品进行分组来计算产品数量的同比百分比变化。

我有这个数据框

    prod_desc  year  prod_count
0  product_a  2019          53
1  product_b  2019          44
2  product_c  2019          36
3  product_a  2020          52
4  product_b  2020          43
5  product_c  2020          42

需要的输出:

     prod_desc  year  prod_count  Percentage change
0  product_a  2019          53                NaN
1  product_b  2019          44                NaN
2  product_c  2019          36                NaN
3  product_a  2020          52              -1.88
4  product_b  2020          43              -2.27
5  product_c  2020          42              16.60

我可以在 python 中使用以下逻辑来做到这一点 pandas 需要使用 pyspark 来实现同样的目标

ds['percentage change'] = ds.sort_values('year').groupby(['prod_desc']).agg('prod_count':'pct_change')

任何帮助将不胜感激

【问题讨论】:

【参考方案1】:

你的 DF:

df = spark.createDataFrame(
  [
     (0, 'product_a', 2019, 53)
    ,(1, 'product_b', 2019, 44)
    ,(2, 'product_c', 2019, 36)
    ,(3, 'product_a', 2020, 52)
    ,(4, 'product_b', 2020, 43)
    ,(5, 'product_c', 2020, 42)
  ], ['id', 'prod_desc', 'year', 'prod_count']
)

您可以使用带有滞后函数的窗口函数:

from pyspark.sql.window import Window
import pyspark.sql.functions as F

W = Window.partitionBy("prod_desc").orderBy('year')

df.withColumn('prod_count_y-1',F.lag(df['prod_count']).over(W))\
  .withColumn('var %', F.round((F.col('prod_count')/F.col('prod_count_y-1') -1)*100,2))\
  .show()

+---+---------+----+----------+--------------+-----+
| id|prod_desc|year|prod_count|prod_count_y-1|var %|
+---+---------+----+----------+--------------+-----+
|  0|product_a|2019|        53|          null| null|
|  3|product_a|2020|        52|            53|-1.89|
|  1|product_b|2019|        44|          null| null|
|  4|product_b|2020|        43|            44|-2.27|
|  2|product_c|2019|        36|          null| null|
|  5|product_c|2020|        42|            36|16.67|
+---+---------+----+----------+--------------+-----+

【讨论】:

【参考方案2】:

这是一个使用Windowlag 的示例来跟踪先前的值并计算百分比变化:

from pyspark.sql import SparkSession, Window
from pyspark.sql import functions as F

spark = SparkSession.builder.getOrCreate()
data = [
    "prod_desc": "product_a", "year": 2019, "prod_count": 53,
    "prod_desc": "product_b", "year": 2019, "prod_count": 44,
    "prod_desc": "product_c", "year": 2019, "prod_count": 36,
    "prod_desc": "product_a", "year": 2020, "prod_count": 52,
    "prod_desc": "product_b", "year": 2020, "prod_count": 43,
    "prod_desc": "product_c", "year": 2020, "prod_count": 42,
]

df = spark.createDataFrame(data)
window = Window.partitionBy("prod_desc").orderBy("year")
df = df.withColumn("prev_value", F.lag(df.prod_count).over(window))
df = (
    df.withColumn(
        "Percentage change",
        F.when(F.isnull(df.prod_count - df.prev_value), None).otherwise(
            (df.prod_count - df.prev_value) * 100 / df.prev_value
        ),
    )
    .drop("prev_value")
    .orderBy("year", "prod_desc")
)

结果:

+----------+---------+----+-------------------+
|prod_count|prod_desc|year|Percentage change  |
+----------+---------+----+-------------------+
|53        |product_a|2019|null               |
|44        |product_b|2019|null               |
|36        |product_c|2019|null               |
|52        |product_a|2020|-1.8867924528301887|
|43        |product_b|2020|-2.272727272727273 |
|42        |product_c|2020|16.666666666666668 |
+----------+---------+----+-------------------+

【讨论】:

以上是关于如何使用 PySpark 逐年获得百分比变化的主要内容,如果未能解决你的问题,请参考以下文章

如何计算 R 中多列的组内百分比变化?

pySpark.sql 如何使用 WHERE 关键字?

矩阵逐年百分比计算

如何在 Pyspark 中获取列的列表视图和 nans/null 的百分比?

农业生产率的提高是如何鼓励人们离开农业的?

如何使用 pySpark 使多个 json 处理更快?