Scala Spark Dataframe 创建一个新列,其中包含另一列的最大先前值和当前值
Posted
技术标签:
【中文标题】Scala Spark Dataframe 创建一个新列,其中包含另一列的最大先前值和当前值【英文标题】:Scala Spark Dataframe Create a new column with maximum of previous and current value of another column 【发布时间】:2020-05-06 17:05:04 【问题描述】:我有一个数据框,其中只有 category
列和 A 列,如下所示。我想填充 B 列,以便比较 A 的当前值和 B 的先前值并存储每个类别的最大值。尝试了 Windows 功能、滞后、类别的最大值等,但我面临的最大挑战是如何在比较两个值时记住早期的最大值。
+---+--------+--+--+
id | category | A | B |
+---+--------+--+--+
1 Fruit 1 1
2 Fruit 5 5
3 Fruit 3 5
4 Fruit 4 5
1 Dessert 4 4
2 Dessert 2 4
1 Veggies 11 11
2 Veggies 7 11
3 Veggies 12 12
4 Veggies 3 12
---+------+---+----+-
【问题讨论】:
你如何定义“以前的”值? 好的,还有一列称为 id,它是数字的,我们已按升序对其进行了排序。现在,第一行的第一个值是 A,并且开始 B = A 为第一行。对于所有接下来的行,B = max (prev B, current A)。 如果B = max ( prev B, current A)
那么示例行 5,6 中的 B 值应该是 = 5 ?
@chlebek,因为它是该类别的第一个元素
【参考方案1】:
使用运行最大值 A
应该可以解决问题:
df
.withColumn("B", max($"A").over(Window.partitionBy($"category").orderBy($"id")))
【讨论】:
【参考方案2】:我很难用 Spark SQL 表达这一点,但使用 Dataset API 通过函数式编程进行管理
scala> case class Food(category: String, a: Int, b: Option[Int] = None)
defined class Food
scala> val ds = spark.createDataset(
| List(
| Food("Fruit", 1),
| Food("Fruit", 5),
| Food("Fruit", 3),
| Food("Fruit", 4),
| Food("Dessert", 4),
| Food("Dessert", 2),
| Food("Veggies", 11),
| Food("Veggies", 7),
| Food("Veggies", 12),
| Food("Veggies", 3)
| )
| )
ds: org.apache.spark.sql.Dataset[Food] = [category: string, a: int ... 1 more field]
scala> ds.show
+--------+---+----+
|category| a| b|
+--------+---+----+
| Fruit| 1|null|
| Fruit| 5|null|
| Fruit| 3|null|
| Fruit| 4|null|
| Dessert| 4|null|
| Dessert| 2|null|
| Veggies| 11|null|
| Veggies| 7|null|
| Veggies| 12|null|
| Veggies| 3|null|
+--------+---+----+
scala> :paste
// Entering paste mode (ctrl-D to finish)
ds.groupByKey(_.category)
.flatMapGroups (key, iter) =>
if (iter.hasNext)
val head = iter.next
iter.scanLeft(head.copy(b = Some(head.a))) (x, y) =>
val a = x.b.map(b => if(x.a > b) x.a else b).getOrElse(x.a)
y.copy(b = if(y.a > a) Some(y.a) else Some(a))
else iter
.show
// Exiting paste mode, now interpreting.
+--------+---+---+
|category| a| b|
+--------+---+---+
| Veggies| 11| 11|
| Veggies| 7| 11|
| Veggies| 12| 12|
| Veggies| 3| 12|
| Dessert| 4| 4|
| Dessert| 2| 4|
| Fruit| 1| 1|
| Fruit| 5| 5|
| Fruit| 3| 5|
| Fruit| 4| 5|
+--------+---+---+
【讨论】:
感谢 Ryan 为解决问题付出了这么多努力。数据集对我来说不是一个选择,因为我们只使用 Dataframe,偶尔使用 rdd。如果我无法通过使用数据框解决问题(请参阅 Raphael 的解决方案上方),我必须采用您的解决方案并在 rdd 中应用。再次感谢。以上是关于Scala Spark Dataframe 创建一个新列,其中包含另一列的最大先前值和当前值的主要内容,如果未能解决你的问题,请参考以下文章
Spark (Scala) - 在 DataFrame 中恢复爆炸
spark dataframe 和 scala Map互相转换
Spark-Scala:使用异常处理将固定宽度线解析为 Dataframe Api