PySpark - 窗口函数导致新列
Posted
技术标签:
【中文标题】PySpark - 窗口函数导致新列【英文标题】:PySpark - window function results in new column 【发布时间】:2021-11-30 13:42:07 【问题描述】:我有以下 PySpark 数据:
+---------+---------+---------+-------------------+
|event_id |user_id | status| created_at|
+---------+---------+---------+-------------------+
| 1| 2| a|2017-05-26 15:12:54|
| 1| 2| b|2017-05-26 15:12:53|
| 2| 1| a|2017-05-26 15:12:56|
| 1| 2| b|2017-05-26 16:12:57|
| 2| 1| c|2017-05-26 16:12:58|
| 2| 1| b|2017-05-26 16:12:58|
| 3| 1| b|2017-05-26 14:17:58|
+---------+---------+---------+-------------------+
对于每对 (event_id, user_id)
(这是主键,数据从数据库中提取)我想为每个 status
创建基于最高 created_at
的新列,对于没有数据的对使用 null
值.以上数据:
+---------+---------+-------------------+-------------------+-------------------+
|event_id |user_id | a| b| c|
+---------+---------+-------------------+-------------------+-------------------+
| 1| 2|2017-05-26 15:12:54|2017-05-26 16:12:57| null|
| 2| 1|2017-05-26 15:12:56| null|2017-05-26 16:12:58|
| 3| 1| null|2017-05-26 14:17:58| null|
+---------+---------+-------------------+-------------------+-------------------+
我的解决方案相当复杂,速度很慢,我很确定它可以优化:
for status in ["a", "b", "c"]:
df2 = df.filter(F.col("status") == status).groupBy(["event_id", "user_id"]).agg(F.max("created_at").alias(status))
df = (
df
.join(
df2,
on=(
(df["event_id"] == df2["event_id"]) &
(df["user_id"] == df2["user_id"]) &
(df["status"] == status)
),
how="left_outer"
)
.select(df["*"], status)
)
df2 = (
df
.drop("status", "created_at")
.groupBy(["event_id", "user_id"])
.agg(F.max("a").alias("a"), F.max("b").alias("b"), F.max("c").alias("c"))
)
# df2 has the result
我可以在这里避免循环中的 JOIN,或者至少将 JOIN + groupBy 和 max 减少到一步吗?就像现在一样,我只是按顺序处理状态,这根本不可扩展。
【问题讨论】:
请尝试在 spark 中使用枢轴 @LearnHadoop 这正是我想要的,你能发布答案吗? 试试这个:df.groupBy("event_id","user_id").pivot("status").agg(first("created_at")).show - 在scala中返回..我想pyspark 代码也类似 @LearnHadoop 是的,我明白这一点,只需在帖子中发布常规答案,以便我接受并关闭问题 【参考方案1】:试试这个,
df.groupBy("event_id","user_id").pivot("status").agg(first("created_at")).show
【讨论】:
以上是关于PySpark - 窗口函数导致新列的主要内容,如果未能解决你的问题,请参考以下文章
pyspark 是不是支持窗口函数(例如 first、last、lag、lead)?