Spark Scala 聚合组 Dataframe

Posted

技术标签:

【中文标题】Spark Scala 聚合组 Dataframe【英文标题】:Spark Scala aggregation group Dataframe 【发布时间】:2019-12-23 16:30:35 【问题描述】:

我有输入数据框并且必须生成输出数据框。 在输入数据框上,我必须对几列进行分组,如果该组具有另一列的sum 该组的某个值,那么我必须使用 x 为该组的每个成员更新一列。 因此,我将获得几个组,并且必须使用 x 更新其中一个列,并且对于在该列中没有进入任何组值的行,不得更改。

喜欢:

Job id , job name, department, age, old.

前 3 列被分组,sum(age) = 100 然后 old 得到 x 组中的所有行 他们将是几组。 并且输出数据帧将具有与输入数据相同的行数。

val dfIn = 职位 ID、职位名称、部门、年龄、年龄

     24        Dev         Sales       30    0
     24        Dev         Sales       40    0
     24        Dev         Sales       20    0
     24        Dev         Sales       10    0
     24        Dev         HR          30    0
     24        Dev         HR          20    0
     24        Dev         Retail      50    0
     24        Dev         Retail      50    0

val dfOut= 工作 id , 工作名称 , 部门 , 年龄 , 老

     24        Dev         Sales       30    x
     24        Dev         Sales       40    x
     24        Dev         Sales       20    x
     24        Dev         Sales       10    x
     24        Dev         HR          30    0
     24        Dev         HR          20    0
     24        Dev         Retail      50    x
     24        Dev         Retail      50    x

【问题讨论】:

如果您不提供一些输入数据样本并显示所需的输出,我怀疑有人会回答这个问题。 对不起,我是在路上写的,我一到电脑就会提供数据。 【参考方案1】:

只需使用Window function 计算sum_age 并在sum_age = 100 时使用when/otherwise 影响Xold 列,否则保持相同的值0

import org.apache.spark.sql.expressions.Window  

val df = Seq(
       (24, "Dev", "Sales", 30, "0"), (24, "Dev", "Sales", 40, "0"),
       (24, "Dev", "Sales", 20, "0"), (24, "Dev", "Sales", 10, "0"),
       (24, "Dev", "HR", 30, "0"), (24, "Dev", "HR", 20, "0"),
       (24, "Dev", "Retail", 50, "0"), (24, "Dev", "Retail", 50, "0")
).toDF("job_id", "job_name", "department", "age", "old")


val w = Window.partitionBy($"job_id", $"job_name", $"department").orderBy($"job_id")
val dfOut = df.withColumn("sum_age", sum(col("age")).over(w))
             .withColumn("old", when($"sum_age" === lit(100), lit("X")).otherwise($"old"))
             .drop($"sum_age")


dfOut.show()

+------+--------+----------+---+---+
|job_id|job_name|department|age|old|
+------+--------+----------+---+---+
|    24|     Dev|        HR| 30|  0|
|    24|     Dev|        HR| 20|  0|
|    24|     Dev|    Retail| 50|  X|
|    24|     Dev|    Retail| 50|  X|
|    24|     Dev|     Sales| 30|  X|
|    24|     Dev|     Sales| 40|  X|
|    24|     Dev|     Sales| 20|  X|
|    24|     Dev|     Sales| 10|  X|
+------+--------+----------+---+---+

【讨论】:

以上是关于Spark Scala 聚合组 Dataframe的主要内容,如果未能解决你的问题,请参考以下文章

Dataframe Spark Scala中的最后一个聚合函数

Spark Streaming Scala 将不同结构的 json 组合成一个 DataFrame

Spark Scala聚合函数,用于查找组中列值的出现次数

使用 Scala 将多列转换为 Spark Dataframe 上的一列地图

在 groupby 之后将 Spark DataFrame 的行聚合到 String

Spark Dataframe:从 Map 类型生成元组数组