如何根据火花数据框中的值的累积总和为每一行分配一个类别?

Posted

技术标签:

【中文标题】如何根据火花数据框中的值的累积总和为每一行分配一个类别?【英文标题】:How to assign a category to each row based on the cumulative sum of values in spark dataframe? 【发布时间】:2021-10-18 06:57:01 【问题描述】:

我有一个 spark 数据框,由两列 [Employee 和 Salary] 组成,其中薪水按升序排列。

示例数据框

Expected Output: 
| Employee |salary |
| -------- | ------|
|  Emp1    |  10   |
| Emp2     |  20   |
| Emp3     |  30   |
| EMp4     |  35   |
| Emp5     |  36   |
| Emp6     |  50   |
| Emp7     |  70   |

我想对行进行分组,使每个组的聚合值小于 80,并为每个组分配一个类别,如下所示。我会继续逐行添加工资,直到总和超过80。一旦超过80,我将分配一个新的类别。

Expected Output: 
| Employee |salary | Category|
| -------- | ------|----------
|  Emp1    |  10   |A        |
| Emp2     |  20   |A        |
| Emp3     |  30   |A        |
| EMp4     |  35   |B        |
| Emp5     |  36   |B        |
| Emp6     |  50   |C        |
| Emp7     |  70   |D        |

有没有一种简单的方法可以在 spark scala 中做到这一点?

【问题讨论】:

dataFrame.select($”Employee”, $”salary”, assignACategory($"Employee”, $”salary" )) 对你有用吗? 工资限制可以试试dataFrame.select($”Employee”, $”salary”, assignACategory($"Employee”, $”salary" )).filter($”salary” < 80) @James 我想在添加行值后分配一个类别 Emp1 + Emp2 + Emp3 = 70 你需要按升序计算salary的cumulative SUM。比简单的整数除以 70 并映射到类别。 我认为累积总和不会起作用。这是一个反例 -> 40 50 60。我们应该得到 40 -> A, 50 -> B, 60 -> C 对吧? cumsum=40 90 150。如果我们除以 80,我们得到 0、1、1,这会将 50 和 60 放在一起。只有当我们可以假设所有类别都是完整的(即工资总和正好等于 80)时,累积总和才会起作用。 【参考方案1】:

要解决您的问题,您可以在 window 上使用自定义 aggregate function

首先,您需要创建自定义聚合函数。聚合函数由累加器(buffer)定义,在处理新行(reduce 函数)或遇到另一个累加器(merge 函数)时,它将被初始化(zero 值)和更新。最后返回累加器(finish函数)

在你的情况下,累加器应该保留两条信息:

当前员工类别 属于当前类别的以前员工的工资总和

要存储这些信息,您可以使用元组(Int, Int),第一个元素是当前类别,第二个元素是当前类别以前员工的工资总和:

你用(0, 0)初始化这个元组。 遇到新行时,如果之前的薪水和当前行的薪水之和超过80,则递增类别并用当前行的薪水重新初始化之前的薪水之和,否则将当前行的薪水添加到之前的薪水'总和。 由于您将使用窗口函数,因此您将按顺序处理行,因此您无需与另一个累加器实现合并。 最后,由于您只需要类别,因此您只返回累加器的第一个元素。

所以我们得到以下聚合器实现:

import org.apache.spark.sql.Encoder, Encoders
import org.apache.spark.sql.expressions.Aggregator

object Labeler extends Aggregator[Int, (Int, Int), Int] 
  override def zero: (Int, Int) = (0, 0)

  override def reduce(catAndSum: (Int, Int), salary: Int): (Int, Int) = 
    if (catAndSum._2 + salary > 80)
      (catAndSum._1 + 1, salary)
    else
      (catAndSum._1, catAndSum._2 + salary)
  

  override def merge(catAndSum1: (Int, Int), catAndSum2: (Int, Int)): (Int, Int) = 
    throw new NotImplementedError("should be used only over a windows function")
  

  override def finish(catAndSum: (Int, Int)): Int = catAndSum._1

  override def bufferEncoder: Encoder[(Int, Int)] = Encoders.tuple(Encoders.scalaInt, Encoders.scalaInt)

  override def outputEncoder: Encoder[Int] = Encoders.scalaInt

拥有聚合器后,您可以使用 udaf 函数将其转换为 spark 聚合函数。

然后,您在所有数据框上创建窗口并按薪水排序,并在此窗口上应用您的 spark 聚合函数:

import org.apache.spark.sql.expressions.Window
import org.apache.spark.sql.functions.col, udaf

val labeler = udaf(Labeler)
val window = Window.orderBy("salary")

val result = dataframe.withColumn("category", labeler(col("salary")).over(window))

使用您的示例作为输入数据框,您将获得以下结果数据框:

+--------+------+--------+
|employee|salary|category|
+--------+------+--------+
|Emp1    |10    |0       |
|Emp2    |20    |0       |
|Emp3    |30    |0       |
|Emp4    |35    |1       |
|Emp5    |36    |1       |
|Emp6    |50    |2       |
|Emp7    |70    |3       |
+--------+------+--------+

【讨论】:

以上是关于如何根据火花数据框中的值的累积总和为每一行分配一个类别?的主要内容,如果未能解决你的问题,请参考以下文章

R/dplyr:使用循环创建滞后并根据列名计算累积总和

计算火花数据框中所有列(300 列)的每个不同值的出现次数

如何获取火花行的 value_counts?

如何从带有火花的数据框中找到最大长度的唯一行?

根据火花数据框中另一列的值查找列的最大值?

SQL Server - 重叠数据的累积总和 - 获取总和达到给定值的日期