PySpark - 窗口函数导致新列

Posted

技术标签:

【中文标题】PySpark - 窗口函数导致新列【英文标题】:PySpark - window function results in new column 【发布时间】:2021-11-30 13:42:07 【问题描述】:

我有以下 PySpark 数据:

+---------+---------+---------+-------------------+
|event_id |user_id  |   status|         created_at|
+---------+---------+---------+-------------------+
|        1|        2|        a|2017-05-26 15:12:54|
|        1|        2|        b|2017-05-26 15:12:53|
|        2|        1|        a|2017-05-26 15:12:56|
|        1|        2|        b|2017-05-26 16:12:57|
|        2|        1|        c|2017-05-26 16:12:58|
|        2|        1|        b|2017-05-26 16:12:58|
|        3|        1|        b|2017-05-26 14:17:58|
+---------+---------+---------+-------------------+

对于每对 (event_id, user_id)(这是主键,数据从数据库中提取)我想为每个 status 创建基于最高 created_at 的新列,对于没有数据的对使用 null 值.以上数据:

+---------+---------+-------------------+-------------------+-------------------+
|event_id |user_id  |                  a|                  b|                  c|
+---------+---------+-------------------+-------------------+-------------------+
|        1|        2|2017-05-26 15:12:54|2017-05-26 16:12:57|               null|
|        2|        1|2017-05-26 15:12:56|               null|2017-05-26 16:12:58|
|        3|        1|               null|2017-05-26 14:17:58|               null|
+---------+---------+-------------------+-------------------+-------------------+

我的解决方案相当复杂,速度很慢,我很确定它可以优化:

for status in ["a", "b", "c"]:
    df2 = df.filter(F.col("status") == status).groupBy(["event_id", "user_id"]).agg(F.max("created_at").alias(status))
    df = (
        df
        .join(
            df2, 
            on=(
                (df["event_id"] == df2["event_id"]) & 
                (df["user_id"] == df2["user_id"]) & 
                (df["status"] == status)
            ),
            how="left_outer"
        )
        .select(df["*"], status)
    )

df2 = (
    df
    .drop("status", "created_at")
    .groupBy(["event_id", "user_id"])
    .agg(F.max("a").alias("a"), F.max("b").alias("b"), F.max("c").alias("c"))
)

# df2 has the result

我可以在这里避免循环中的 JOIN,或者至少将 JOIN + groupBy 和 max 减少到一步吗?就像现在一样,我只是按顺序处理状态,这根本不可扩展。

【问题讨论】:

请尝试在 spark 中使用枢轴 @LearnHadoop 这正是我想要的,你能发布答案吗? 试试这个:df.groupBy("event_id","user_id").pivot("status").agg(first("created_at")).show - 在scala中返回..我想pyspark 代码也类似 @LearnHadoop 是的,我明白这一点,只需在帖子中发布常规答案,以便我接受并关闭问题 【参考方案1】:

试试这个,

df.groupBy("event_id","user_id").pivot("status").agg(first("created_at")).show

【讨论】:

以上是关于PySpark - 窗口函数导致新列的主要内容,如果未能解决你的问题,请参考以下文章

如何使用新列scala在内存转换中应用窗口函数

Pandas:在数据框中创建一个新列,该列是滚动窗口的函数

pyspark 是不是支持窗口函数(例如 first、last、lag、lead)?

pyspark 使用过滤器应用 DataFrame 窗口函数

PySpark 中的窗口函数和条件过滤器

如何创建与列相关的大小的 Pyspark 窗口函数