如何在 pyspark 中对需要在聚合中聚合的分组数据应用窗口函数?

Posted

技术标签:

【中文标题】如何在 pyspark 中对需要在聚合中聚合的分组数据应用窗口函数?【英文标题】:How to apply windowing function in pyspark over grouped data which needs an aggregation within an aggregation? 【发布时间】:2017-09-13 20:00:27 【问题描述】:

我有一个复杂的 winodwing 操作,我需要 pyspark 的帮助。

我有一些按srcdest分组的数据,我需要对每个组做如下操作: - 仅选择金额在socket2 中且 出现在socket1 中的行(对于该组中的所有行) - 应用该过滤条件后,在amounts 字段中求和

amounts     src    dest    socket1   socket2
10          1        2           A       B
11          1        2           B        C
12           1        2          C       D
510          1       2          C       D
550          1        2          B       C  
500          1        2          A       B
80            1         3          A        B

我想通过以下方式聚合它: 512+10 = 522,而80是src=1和dest=3的唯一记录

amounts     src    dest    
522          1        2      
80          1        3    

我从这里借用了样本数据:How to write Pyspark UDAF on multiple columns?

【问题讨论】:

您已经尝试过什么了吗?你能分享你的实验吗? 好的,请等一下 【参考方案1】:

您可以将数据框拆分为 2 个数据框,一个带有 socket1,另一个带有 socket2,然后使用 leftanti 连接而不是过滤(适用于 spark >= 2.0)。

首先让我们创建数据框:

df = spark.createDataFrame(
    sc.parallelize([
        [10,1,2,"A","B"],
        [11,1,2,"B","C"],
        [12,1,2,"C","D"],
        [510,1,2,"C","D"],
        [550,1,2,"B","C"],
        [500,1,2,"A","B"],
        [80,1,3,"A","B"]
    ]), 
    ["amounts","src","dest","socket1","socket2"]
)

现在拆分数据框:

火花 >= 2.0

df1 = df.withColumnRenamed("socket1", "socket").drop("socket2")
df2 = df.withColumnRenamed("socket2", "socket").drop("socket1")
res = df2.join(df1, ["src", "dest", "socket"], "leftanti")

Spark 1.6

df1 = df.withColumnRenamed("socket1", "socket").drop("socket2").withColumnRenamed("amounts", "amounts1")
df2 = df.withColumnRenamed("socket2", "socket").drop("socket1")
res = df2.join(df1.alias("df1"), ["src", "dest", "socket"], "left").filter("amounts1 IS NULL").drop("amounts1")

最后是聚合:

import pyspark.sql.functions as psf
res.groupBy("src", "dest").agg(
    psf.sum("amounts").alias("amounts")
).show()

    +---+----+-------+
    |src|dest|amounts|
    +---+----+-------+
    |  1|   3|     80|
    |  1|   2|    522|
    +---+----+-------+

【讨论】:

and... +1 for 1k :) @Marie

以上是关于如何在 pyspark 中对需要在聚合中聚合的分组数据应用窗口函数?的主要内容,如果未能解决你的问题,请参考以下文章

如何在熊猫中对没有聚合功能的列进行分组?

如何通过不同级别的枢轴聚合然后在pyspark中进行内部连接?

在 PySpark Dataframe 中结合旋转和分组聚合

PySpark 聚合和分组依据

如何在 mongoDB 中使用聚合进行分组?

如何按列分组并聚合其余列