如何根据 PySpark 中窗口聚合的条件计算不同值?
Posted
技术标签:
【中文标题】如何根据 PySpark 中窗口聚合的条件计算不同值?【英文标题】:How to count distinct based on a condition over a window aggregation in PySpark? 【发布时间】:2021-11-26 17:50:00 【问题描述】:这是我拥有的数据的示例数据框:
from pyspark.sql.functions import *
from pyspark.sql.types import StringType, IntegerType, DateType, StructType, StructField
from datetime import datetime
from pyspark.sql import Window
data2 = [
(datetime.strptime("2020/12/29", "%Y/%m/%d"), "Store B", "Product 1", 0),
(datetime.strptime("2020/12/29", "%Y/%m/%d"), "Store B", "Product 2", 1),
(datetime.strptime("2020/12/31", "%Y/%m/%d"), "Store A", "Product 2", 1),
(datetime.strptime("2020/12/31", "%Y/%m/%d"), "Store A", "Product 3", 1),
(datetime.strptime("2021/01/01", "%Y/%m/%d"), "Store A", "Product 1", 1),
(datetime.strptime("2021/01/01", "%Y/%m/%d"), "Store A", "Product 2", 3),
(datetime.strptime("2021/01/01", "%Y/%m/%d"), "Store A", "Product 3", 2),
(datetime.strptime("2021/01/01", "%Y/%m/%d"), "Store B", "Product 1", 10),
(datetime.strptime("2021/01/01", "%Y/%m/%d"), "Store B", "Product 2", 15),
(datetime.strptime("2021/01/01", "%Y/%m/%d"), "Store B", "Product 3", 9),
(datetime.strptime("2021/01/02", "%Y/%m/%d"), "Store A", "Product 1", 0),
(datetime.strptime("2021/01/03", "%Y/%m/%d"), "Store A", "Product 2", 2)
]
schema = StructType([ \
StructField("date",DateType(),True), \
StructField("store",StringType(),True), \
StructField("product",StringType(),True), \
StructField("stock_c", IntegerType(), True)
])
df = spark.createDataFrame(data=data2,schema=schema)
df.printSchema()
df.show(truncate=False)
root
|-- date: date (nullable = true)
|-- store: string (nullable = true)
|-- product: string (nullable = true)
|-- stock_c: integer (nullable = true)
+----------+-------+---------+-------+
|date |store |product |stock_c|
+----------+-------+---------+-------+
|2020-12-29|Store B|Product 1|0 |
|2020-12-29|Store B|Product 2|1 |
|2020-12-31|Store A|Product 2|1 |
|2020-12-31|Store A|Product 3|1 |
|2021-01-01|Store A|Product 1|1 |
|2021-01-01|Store A|Product 2|3 |
|2021-01-01|Store A|Product 3|2 |
|2021-01-01|Store B|Product 1|10 |
|2021-01-01|Store B|Product 2|15 |
|2021-01-01|Store B|Product 3|9 |
|2021-01-02|Store A|Product 1|0 |
|2021-01-03|Store A|Product 2|2 |
+----------+-------+---------+-------+
列stock_c
代表商店中产品的累计库存。
我想创建两个新列,其中一个告诉我商店拥有或过去拥有多少产品。这很容易。我需要的另一列是该商店当天有库存的产品数量,这是我无法解决的问题。
这是我使用的代码:
windowStore = Window.partitionBy("store").orderBy("date")
df \
.withColumn("num_products", approx_count_distinct("product").over(windowStore)) \
.withColumn("num_products_with_stock", approx_count_distinct(when(col("stock_c") > 0, col("product"))).over(windowStore)) \
.show()
这是我得到的:
+----------+-------+---------+-------+------------+-----------------------+
| date| store| product|stock_c|num_products|num_products_with_stock|
+----------+-------+---------+-------+------------+-----------------------+
|2020-12-31|Store A|Product 2| 1| 2| 2|
|2020-12-31|Store A|Product 3| 1| 2| 2|
|2021-01-01|Store A|Product 1| 1| 3| 3|
|2021-01-01|Store A|Product 2| 3| 3| 3|
|2021-01-01|Store A|Product 3| 2| 3| 3|
|2021-01-02|Store A|Product 1| 0| 3| 3|
|2021-01-03|Store A|Product 2| 2| 3| 3|
|2020-12-29|Store B|Product 1| 0| 2| 1|
|2020-12-29|Store B|Product 2| 1| 2| 1|
|2021-01-01|Store B|Product 1| 10| 3| 3|
|2021-01-01|Store B|Product 2| 15| 3| 3|
|2021-01-01|Store B|Product 3| 9| 3| 3|
+----------+-------+---------+-------+------------+-----------------------+
这是我想要的:
+----------+-------+---------+-------+------------+-----------------------+
| date| store| product|stock_c|num_products|num_products_with_stock|
+----------+-------+---------+-------+------------+-----------------------+
|2020-12-31|Store A|Product 2| 1| 2| 2|
|2020-12-31|Store A|Product 3| 1| 2| 2|
|2021-01-01|Store A|Product 1| 1| 3| 3|
|2021-01-01|Store A|Product 2| 3| 3| 3|
|2021-01-01|Store A|Product 3| 2| 3| 3|
|2021-01-02|Store A|Product 1| 0| 3| 2|
|2021-01-03|Store A|Product 2| 2| 3| 2|
|2020-12-29|Store B|Product 1| 0| 2| 1|
|2020-12-29|Store B|Product 2| 1| 2| 1|
|2021-01-01|Store B|Product 1| 10| 3| 3|
|2021-01-01|Store B|Product 2| 15| 3| 3|
|2021-01-01|Store B|Product 3| 9| 3| 3|
+----------+-------+---------+-------+------------+-----------------------+
关键在于这两行,因为产品 1 没有更多库存,那么它应该反映您只有 2 个有库存的产品(产品 2 和产品 3)。
|2021-01-02|Store A|Product 1| 0| 3| 2|
|2021-01-03|Store A|Product 2| 2| 3| 2|
我怎样才能实现我想要的?
提前致谢。
【问题讨论】:
【参考方案1】:您可以在下面找到我用来解决num_products_with_stock
列问题的代码。基本上,我创建了一个新的条件列,当 stock_c
为 0 时替换 None
的产品。在一天结束时,我使用了一个非常接近的代码,就像您使用的那样,但是在我创建的这个新列上做了 F.approx_count_distinct
.
from pyspark.sql import functions as F
from pyspark.sql import Window as W
window1 = W.partitionBy("store").orderBy("date")
window2 = W.partitionBy(["store", "date"]).orderBy("date")
df = (df
.withColumn("num_products", F.approx_count_distinct("product").over(window1))
.withColumn('hasItem', F.when(F.col('stock_c') > 0, F.col('product')).otherwise(None))
.withColumn("num_products_with_stock", F.approx_count_distinct(F.col("hasItem")).over(window2))
.drop('hasItem')
)
df.show()
希望这能解决您的问题!
【讨论】:
感谢您的回复@danimille,我想我还需要填写缺失的日期,因为我只有在一种产品的库存变化时才有数据。使用你的代码我没有得到我想要的,但我想我可以解决它。再次感谢!【参考方案2】:我终于在@danimille 的帮助下解决了这个问题
首先,我完成了缺失的日期,然后使用名为has_stock
的帮助列计算了有库存的产品数量:
from datetime import timedelta
from pyspark.sql.types import ArrayType, TimestampType
def dates_between(t1, t2):
return [t1 + timedelta(days=x) for x in range(0, int((t2-t1).days) + 1)]
dates_between_udf = udf(dates_between, ArrayType(TimestampType()))
date_filler = (
df
.withColumn('date', to_timestamp(to_date('date'))) # Ñapa de las gordas
.withColumn("max_date", max("date").over(Window.partitionBy("store")))
.withColumn("min_date", min("date").over(Window.partitionBy("store")))
.withColumn("products", collect_set("product").over(Window.partitionBy("store")))
.withColumn("dates", dates_between_udf(col("min_date"), col("max_date")))
.select("store", "products", "dates")
.distinct()
.withColumn("product", explode("products"))
.withColumn("date", explode("dates"))
.drop("products", "dates")
)
(
df
.join(date_filler, on = ["store", "product", "date"], how = "full")
.withColumn(
"stock_c",
last("stock_c", ignorenulls=True).over(Window.partitionBy("store", "product").orderBy(col("date")))
)
.na.fill(0, "stock_c")
.withColumn("num_products", approx_count_distinct("product").over(windowStore))
.withColumn("has_stock", when(col("stock_c") > 0, 1).otherwise(0))
.withColumn("num_products_with_stock", sum("has_stock").over(Window.partitionBy("store", "date")))
.show()
)
结果如下:
+-------+---------+-------------------+-------+------------+-----------------------+---------+
| store| product| date|stock_c|num_products|num_products_with_stock|has_stock|
+-------+---------+-------------------+-------+------------+-----------------------+---------+
|Store A|Product 1|2020-12-31 00:00:00| 0| 3| 2| 0|
|Store A|Product 2|2020-12-31 00:00:00| 1| 3| 2| 1|
|Store A|Product 3|2020-12-31 00:00:00| 1| 3| 2| 1|
|Store A|Product 1|2021-01-01 00:00:00| 1| 3| 3| 1|
|Store A|Product 2|2021-01-01 00:00:00| 3| 3| 3| 1|
|Store A|Product 3|2021-01-01 00:00:00| 2| 3| 3| 1|
|Store A|Product 1|2021-01-02 00:00:00| 0| 3| 2| 0|
|Store A|Product 2|2021-01-02 00:00:00| 3| 3| 2| 1|
|Store A|Product 3|2021-01-02 00:00:00| 2| 3| 2| 1|
|Store A|Product 1|2021-01-03 00:00:00| 0| 3| 2| 0|
|Store A|Product 2|2021-01-03 00:00:00| 2| 3| 2| 1|
|Store A|Product 3|2021-01-03 00:00:00| 2| 3| 2| 1|
|Store B|Product 1|2020-12-29 00:00:00| 0| 3| 1| 0|
|Store B|Product 2|2020-12-29 00:00:00| 1| 3| 1| 1|
|Store B|Product 3|2020-12-29 00:00:00| 0| 3| 1| 0|
|Store B|Product 1|2020-12-30 00:00:00| 0| 3| 1| 0|
|Store B|Product 2|2020-12-30 00:00:00| 1| 3| 1| 1|
|Store B|Product 3|2020-12-30 00:00:00| 0| 3| 1| 0|
|Store B|Product 1|2020-12-31 00:00:00| 0| 3| 1| 0|
|Store B|Product 2|2020-12-31 00:00:00| 1| 3| 1| 1|
+-------+---------+-------------------+-------+------------+-----------------------+---------+
only showing top 20 rows
【讨论】:
以上是关于如何根据 PySpark 中窗口聚合的条件计算不同值?的主要内容,如果未能解决你的问题,请参考以下文章