Scala Spark Dataframe 创建一个新列,其中包含另一列的最大先前值和当前值

Posted

技术标签:

【中文标题】Scala Spark Dataframe 创建一个新列,其中包含另一列的最大先前值和当前值【英文标题】:Scala Spark Dataframe Create a new column with maximum of previous and current value of another column 【发布时间】:2020-05-06 17:05:04 【问题描述】:

我有一个数据框,其中只有 category 列和 A 列,如下所示。我想填充 B 列,以便比较 A 的当前值和 B 的先前值并存储每个类别的最大值。尝试了 Windows 功能、滞后、类别的最大值等,但我面临的最大挑战是如何在比较两个值时记住早期的​​最大值。

 +---+--------+--+--+
 id |  category | A | B |
 +---+--------+--+--+
  1  Fruit   1   1
  2  Fruit   5   5
  3  Fruit   3   5 
  4  Fruit   4   5 
  1  Dessert 4   4
  2  Dessert 2   4
  1  Veggies 11  11
  2  Veggies 7   11
  3  Veggies 12  12
  4  Veggies 3   12
  ---+------+---+----+-

【问题讨论】:

你如何定义“以前的”值? 好的,还有一列称为 id,它是数字的,我们已按升序对其进行了排序。现在,第一行的第一个值是 A,并且开始 B = A 为第一行。对于所有接下来的行,B = max (prev B, current A)。 如果 B = max ( prev B, current A) 那么示例行 5,6 中的 B 值应该是 = 5 ? @chlebek,因为它是该类别的第一个元素 【参考方案1】:

使用运行最大值 A 应该可以解决问题:

df
 .withColumn("B", max($"A").over(Window.partitionBy($"category").orderBy($"id")))

【讨论】:

【参考方案2】:

我很难用 Spark SQL 表达这一点,但使用 Dataset API 通过函数式编程进行管理

scala>   case class Food(category: String, a: Int, b: Option[Int] = None)
defined class Food

scala>     val ds = spark.createDataset(
     |       List(
     |         Food("Fruit", 1),
     |         Food("Fruit", 5),
     |         Food("Fruit", 3),
     |         Food("Fruit", 4),
     |         Food("Dessert", 4),
     |         Food("Dessert", 2),
     |         Food("Veggies", 11),
     |         Food("Veggies", 7),
     |         Food("Veggies", 12),
     |         Food("Veggies", 3)
     |       )
     |     )
ds: org.apache.spark.sql.Dataset[Food] = [category: string, a: int ... 1 more field]

scala> ds.show
+--------+---+----+
|category|  a|   b|
+--------+---+----+
|   Fruit|  1|null|
|   Fruit|  5|null|
|   Fruit|  3|null|
|   Fruit|  4|null|
| Dessert|  4|null|
| Dessert|  2|null|
| Veggies| 11|null|
| Veggies|  7|null|
| Veggies| 12|null|
| Veggies|  3|null|
+--------+---+----+


scala> :paste
// Entering paste mode (ctrl-D to finish)

    ds.groupByKey(_.category)
      .flatMapGroups  (key, iter) =>
        if (iter.hasNext) 
          val head = iter.next
          iter.scanLeft(head.copy(b = Some(head.a)))  (x, y) =>
            val a = x.b.map(b => if(x.a > b) x.a else b).getOrElse(x.a)
            y.copy(b = if(y.a > a) Some(y.a) else Some(a))
          
         else iter
      
      .show

// Exiting paste mode, now interpreting.

+--------+---+---+
|category|  a|  b|
+--------+---+---+
| Veggies| 11| 11|
| Veggies|  7| 11|
| Veggies| 12| 12|
| Veggies|  3| 12|
| Dessert|  4|  4|
| Dessert|  2|  4|
|   Fruit|  1|  1|
|   Fruit|  5|  5|
|   Fruit|  3|  5|
|   Fruit|  4|  5|
+--------+---+---+

【讨论】:

感谢 Ryan 为解决问题付出了这么多努力。数据集对我来说不是一个选择,因为我们只使用 Dataframe,偶尔使用 rdd。如果我无法通过使用数据框解决问题(请参阅 Raphael 的解决方案上方),我必须采用您的解决方案并在 rdd 中应用。再次感谢。

以上是关于Scala Spark Dataframe 创建一个新列,其中包含另一列的最大先前值和当前值的主要内容,如果未能解决你的问题,请参考以下文章

Spark (Scala) - 在 DataFrame 中恢复爆炸

scala spark dataframe 修改字段类型

spark dataframe 和 scala Map互相转换

Spark-Scala:使用异常处理将固定宽度线解析为 Dataframe Api

Spark/Scala:对带有数组类型列的 DataFrame 中的某些组件的操作

使用Scala在Spark中创建DataFrame时出错