pyspark:聚合列中最频繁的值

Posted

技术标签:

【中文标题】pyspark:聚合列中最频繁的值【英文标题】:pyspark: aggregate on the most frequent value in a column 【发布时间】:2017-08-11 11:59:42 【问题描述】:
  aggregrated_table = df_input.groupBy('city', 'income_bracket') \
        .agg(
       count('suburb').alias('suburb'),
       sum('population').alias('population'),
       sum('gross_income').alias('gross_income'),
       sum('no_households').alias('no_households'))

希望按城市和收入等级分组,但在每个城市中,某些郊区的收入等级不同。如何按每个城市出现频率最高的收入等级进行分组?

例如:

city1 suburb1 income_bracket_10 
city1 suburb1 income_bracket_10 
city1 suburb2 income_bracket_10 
city1 suburb3 income_bracket_11 
city1 suburb4 income_bracket_10 

将按income_bracket_10 分组

【问题讨论】:

你能告诉我们想要的输出吗? 【参考方案1】:

在聚合之前使用窗口函数可能会奏效:

from pyspark.sql import Window
import pyspark.sql.functions as psf

w = Window.partitionBy('city')
aggregrated_table = df_input.withColumn(
    "count", 
    psf.count("*").over(w)
).withColumn(
    "rn", 
    psf.row_number().over(w.orderBy(psf.desc("count")))
).filter("rn = 1").groupBy('city', 'income_bracket').agg(
   psf.count('suburb').alias('suburb'),
   psf.sum('population').alias('population'),
   psf.sum('gross_income').alias('gross_income'),
   psf.sum('no_households').alias('no_households'))

您还可以在聚合后使用窗口函数,因为您要保留 (city, income_bracket) 出现次数。

【讨论】:

完美 - 谢谢!我确实遇到了一些优先于实际值的空值问题,但是将您的解决方案与***.com/questions/35142216/… 结合使用,它可以工作!【参考方案2】:

你不一定需要窗口函数:

aggregrated_table = (
    df_input.groupby("city", "suburb","income_bracket")
    .count()
    .withColumn("count_income", F.array("count", "income_bracket"))
    .groupby("city", "suburb")
    .agg(F.max("count_income").getItem(1).alias("most_common_income_bracket"))
) 

我认为这可以满足您的要求。我真的不知道它是否比基于窗口的解决方案表现更好。

【讨论】:

mfcabrera 的解决方案更适合大型数据集,您不会将整个数据集强制到单个节点中。【参考方案3】:

当在 F.array 列上使用 F.max 时,mfcabrera 的解决方案给出了错误的结果,因为 ArrayType 中的值被视为字符串并且整数 max 没有按预期工作。

以下解决方案有效。

w = Window.partitionBy('city', "suburb").orderBy(f.desc("count"))

aggregrated_table = (
    input_df.groupby("city", "suburb","income_bracket")
    .count()
    
    .withColumn("max_income", f.row_number().over(w2))
    .filter(f.col("max_income") == 1).drop("max_income")
) 
aggregrated_table.display()

【讨论】:

以上是关于pyspark:聚合列中最频繁的值的主要内容,如果未能解决你的问题,请参考以下文章

在 groupby 操作 PySpark 中聚合列中的稀疏向量

Pyspark - 从每列中选择不同的值

如何将 pyspark 数据框列中的值与 pyspark 中的另一个数据框进行比较

PySpark DataFrame的逐行聚合

如何过滤 PySpark 中数组列中的值?

如何根据 Pyspark 中数组列中的值创建新列