如何根据 PySpark 中窗口聚合的条件计算不同值?

Posted

技术标签:

【中文标题】如何根据 PySpark 中窗口聚合的条件计算不同值?【英文标题】:How to count distinct based on a condition over a window aggregation in PySpark? 【发布时间】:2021-11-26 17:50:00 【问题描述】:

这是我拥有的数据的示例数据框:

from pyspark.sql.functions import *
from pyspark.sql.types import StringType, IntegerType, DateType, StructType, StructField
from datetime import datetime
from pyspark.sql import Window

data2 = [
  (datetime.strptime("2020/12/29", "%Y/%m/%d"), "Store B", "Product 1", 0),
  (datetime.strptime("2020/12/29", "%Y/%m/%d"), "Store B", "Product 2", 1),
  (datetime.strptime("2020/12/31", "%Y/%m/%d"), "Store A", "Product 2", 1),
  (datetime.strptime("2020/12/31", "%Y/%m/%d"), "Store A", "Product 3", 1),
  (datetime.strptime("2021/01/01", "%Y/%m/%d"), "Store A", "Product 1", 1),
  (datetime.strptime("2021/01/01", "%Y/%m/%d"), "Store A", "Product 2", 3),
  (datetime.strptime("2021/01/01", "%Y/%m/%d"), "Store A", "Product 3", 2),
  (datetime.strptime("2021/01/01", "%Y/%m/%d"), "Store B", "Product 1", 10),
  (datetime.strptime("2021/01/01", "%Y/%m/%d"), "Store B", "Product 2", 15),
  (datetime.strptime("2021/01/01", "%Y/%m/%d"), "Store B", "Product 3", 9),
  (datetime.strptime("2021/01/02", "%Y/%m/%d"), "Store A", "Product 1", 0),
  (datetime.strptime("2021/01/03", "%Y/%m/%d"), "Store A", "Product 2", 2)
]

schema = StructType([ \
    StructField("date",DateType(),True), \
    StructField("store",StringType(),True), \
    StructField("product",StringType(),True), \
    StructField("stock_c", IntegerType(), True)
  ])
 
df = spark.createDataFrame(data=data2,schema=schema)
df.printSchema()
df.show(truncate=False)
root
 |-- date: date (nullable = true)
 |-- store: string (nullable = true)
 |-- product: string (nullable = true)
 |-- stock_c: integer (nullable = true)

+----------+-------+---------+-------+
|date      |store  |product  |stock_c|
+----------+-------+---------+-------+
|2020-12-29|Store B|Product 1|0      |
|2020-12-29|Store B|Product 2|1      |
|2020-12-31|Store A|Product 2|1      |
|2020-12-31|Store A|Product 3|1      |
|2021-01-01|Store A|Product 1|1      |
|2021-01-01|Store A|Product 2|3      |
|2021-01-01|Store A|Product 3|2      |
|2021-01-01|Store B|Product 1|10     |
|2021-01-01|Store B|Product 2|15     |
|2021-01-01|Store B|Product 3|9      |
|2021-01-02|Store A|Product 1|0      |
|2021-01-03|Store A|Product 2|2      |
+----------+-------+---------+-------+

stock_c代表商店中产品的累计库存。

我想创建两个新列,其中一个告诉我商店拥有或过去拥有多少产品。这很容易。我需要的另一列是该商店当天有库存的产品数量,这是我无法解决的问题。

这是我使用的代码:

windowStore = Window.partitionBy("store").orderBy("date")

df \
.withColumn("num_products", approx_count_distinct("product").over(windowStore)) \
.withColumn("num_products_with_stock", approx_count_distinct(when(col("stock_c") > 0, col("product"))).over(windowStore)) \
.show()

这是我得到的:

+----------+-------+---------+-------+------------+-----------------------+
|      date|  store|  product|stock_c|num_products|num_products_with_stock|
+----------+-------+---------+-------+------------+-----------------------+
|2020-12-31|Store A|Product 2|      1|           2|                      2|
|2020-12-31|Store A|Product 3|      1|           2|                      2|
|2021-01-01|Store A|Product 1|      1|           3|                      3|
|2021-01-01|Store A|Product 2|      3|           3|                      3|
|2021-01-01|Store A|Product 3|      2|           3|                      3|
|2021-01-02|Store A|Product 1|      0|           3|                      3|
|2021-01-03|Store A|Product 2|      2|           3|                      3|
|2020-12-29|Store B|Product 1|      0|           2|                      1|
|2020-12-29|Store B|Product 2|      1|           2|                      1|
|2021-01-01|Store B|Product 1|     10|           3|                      3|
|2021-01-01|Store B|Product 2|     15|           3|                      3|
|2021-01-01|Store B|Product 3|      9|           3|                      3|
+----------+-------+---------+-------+------------+-----------------------+

这是我想要的:

+----------+-------+---------+-------+------------+-----------------------+
|      date|  store|  product|stock_c|num_products|num_products_with_stock|
+----------+-------+---------+-------+------------+-----------------------+
|2020-12-31|Store A|Product 2|      1|           2|                      2|
|2020-12-31|Store A|Product 3|      1|           2|                      2|
|2021-01-01|Store A|Product 1|      1|           3|                      3|
|2021-01-01|Store A|Product 2|      3|           3|                      3|
|2021-01-01|Store A|Product 3|      2|           3|                      3|
|2021-01-02|Store A|Product 1|      0|           3|                      2|
|2021-01-03|Store A|Product 2|      2|           3|                      2|
|2020-12-29|Store B|Product 1|      0|           2|                      1|
|2020-12-29|Store B|Product 2|      1|           2|                      1|
|2021-01-01|Store B|Product 1|     10|           3|                      3|
|2021-01-01|Store B|Product 2|     15|           3|                      3|
|2021-01-01|Store B|Product 3|      9|           3|                      3|
+----------+-------+---------+-------+------------+-----------------------+

关键在于这两行,因为产品 1 没有更多库存,那么它应该反映您只有 2 个有库存的产品(产品 2 和产品 3)。

|2021-01-02|Store A|Product 1|      0|           3|                      2|
|2021-01-03|Store A|Product 2|      2|           3|                      2|

我怎样才能实现我想要的?

提前致谢。

【问题讨论】:

【参考方案1】:

您可以在下面找到我用来解决num_products_with_stock 列问题的代码。基本上,我创建了一个新的条件列,当 stock_c 为 0 时替换 None 的产品。在一天结束时,我使用了一个非常接近的代码,就像您使用的那样,但是在我创建的这个新列上做了 F.approx_count_distinct .

from pyspark.sql import functions as F
from pyspark.sql import Window as W

window1 = W.partitionBy("store").orderBy("date")
window2 = W.partitionBy(["store", "date"]).orderBy("date")

df = (df 
        .withColumn("num_products", F.approx_count_distinct("product").over(window1))
        .withColumn('hasItem', F.when(F.col('stock_c') > 0, F.col('product')).otherwise(None))
        .withColumn("num_products_with_stock", F.approx_count_distinct(F.col("hasItem")).over(window2))
        .drop('hasItem')
     )

df.show()

希望这能解决您的问题!

【讨论】:

感谢您的回复@danimille,我想我还需要填写缺失的日期,因为我只有在一种产品的库存变化时才有数据。使用你的代码我没有得到我想要的,但我想我可以解决它。再次感谢!【参考方案2】:

我终于在@danimille 的帮助下解决了这个问题

首先,我完成了缺失的日期,然后使用名为has_stock 的帮助列计算了有库存的产品数量:

from datetime import timedelta
from pyspark.sql.types import ArrayType, TimestampType

def dates_between(t1, t2):
    return [t1 + timedelta(days=x) for x in range(0, int((t2-t1).days) + 1)]
 
dates_between_udf = udf(dates_between, ArrayType(TimestampType()))

date_filler = (
  df
  .withColumn('date', to_timestamp(to_date('date'))) # Ñapa de las gordas
  .withColumn("max_date", max("date").over(Window.partitionBy("store")))
  .withColumn("min_date", min("date").over(Window.partitionBy("store")))
  .withColumn("products", collect_set("product").over(Window.partitionBy("store")))
  .withColumn("dates", dates_between_udf(col("min_date"), col("max_date")))
  .select("store", "products", "dates")
  .distinct()
  .withColumn("product", explode("products"))
  .withColumn("date", explode("dates"))
  .drop("products", "dates")
)



(
  df
  .join(date_filler, on = ["store", "product", "date"], how = "full")
  .withColumn(
    "stock_c", 
    last("stock_c", ignorenulls=True).over(Window.partitionBy("store", "product").orderBy(col("date")))
  )
  .na.fill(0, "stock_c")
  .withColumn("num_products", approx_count_distinct("product").over(windowStore))
  .withColumn("has_stock", when(col("stock_c") > 0, 1).otherwise(0))
  .withColumn("num_products_with_stock", sum("has_stock").over(Window.partitionBy("store", "date")))
  .show()
)

结果如下:

+-------+---------+-------------------+-------+------------+-----------------------+---------+
|  store|  product|               date|stock_c|num_products|num_products_with_stock|has_stock|
+-------+---------+-------------------+-------+------------+-----------------------+---------+
|Store A|Product 1|2020-12-31 00:00:00|      0|           3|                      2|        0|
|Store A|Product 2|2020-12-31 00:00:00|      1|           3|                      2|        1|
|Store A|Product 3|2020-12-31 00:00:00|      1|           3|                      2|        1|
|Store A|Product 1|2021-01-01 00:00:00|      1|           3|                      3|        1|
|Store A|Product 2|2021-01-01 00:00:00|      3|           3|                      3|        1|
|Store A|Product 3|2021-01-01 00:00:00|      2|           3|                      3|        1|
|Store A|Product 1|2021-01-02 00:00:00|      0|           3|                      2|        0|
|Store A|Product 2|2021-01-02 00:00:00|      3|           3|                      2|        1|
|Store A|Product 3|2021-01-02 00:00:00|      2|           3|                      2|        1|
|Store A|Product 1|2021-01-03 00:00:00|      0|           3|                      2|        0|
|Store A|Product 2|2021-01-03 00:00:00|      2|           3|                      2|        1|
|Store A|Product 3|2021-01-03 00:00:00|      2|           3|                      2|        1|
|Store B|Product 1|2020-12-29 00:00:00|      0|           3|                      1|        0|
|Store B|Product 2|2020-12-29 00:00:00|      1|           3|                      1|        1|
|Store B|Product 3|2020-12-29 00:00:00|      0|           3|                      1|        0|
|Store B|Product 1|2020-12-30 00:00:00|      0|           3|                      1|        0|
|Store B|Product 2|2020-12-30 00:00:00|      1|           3|                      1|        1|
|Store B|Product 3|2020-12-30 00:00:00|      0|           3|                      1|        0|
|Store B|Product 1|2020-12-31 00:00:00|      0|           3|                      1|        0|
|Store B|Product 2|2020-12-31 00:00:00|      1|           3|                      1|        1|
+-------+---------+-------------------+-------+------------+-----------------------+---------+
only showing top 20 rows

【讨论】:

以上是关于如何根据 PySpark 中窗口聚合的条件计算不同值?的主要内容,如果未能解决你的问题,请参考以下文章

pyspark 时间序列数据的高性能滚动/窗口聚合

如何在 PySpark 中计算不同窗口大小的滚动总和

如何在 pyspark 中对需要在聚合中聚合的分组数据应用窗口函数?

具有组间聚合结果的 Pyspark 窗口

pyspark如何在窗口内聚合

Pyspark:如何在不同条件的数据框中创建列