如何相对于 spark 中的 col 值在一组行上设置增量 id
Posted
技术标签:
【中文标题】如何相对于 spark 中的 col 值在一组行上设置增量 id【英文标题】:how to set a increment id over set of rows with respect to a col value in spark 【发布时间】:2018-04-27 08:43:41 【问题描述】:您好,我有一个数据集如下所示:
我的意见:
+----------+----------------+
| id | flag |
+----------+----------------|
| 1 | false |
+----------+----------------|
| 2 | true |
+----------+----------------|
| 3 | false |
+----------+----------------|
| 4 | true |
+----------+----------------|
| 5 | false |
+----------+----------------|
| 6 | false |
+----------+----------------|
| 7 | true |
+----------+----------------+
输出:
+----------+----------------+----------------------------+
| id | flag | new_col |
+----------+---------------------------------------------+
| 1 | false | 1 |
+----------+---------------------------------------------+
| 2 | true | 1 |
+----------+----------------+----------------------------+
| 3 | false | 3 |
+----------+----------------+----------------------------+
| 4 | true | 3 |
+----------+----------------+----------------------------+
| 5 | false | 5 |
+----------+----------------+----------------------------+
| 6 | false | 6 |
+----------+----------------+----------------------------+
| 7 | true | 6 |
+----------+----------------+----------------------------+
每个 false 值都会将 new_col 值更改为其 id 等等... 有什么帮助吗?
【问题讨论】:
这是我的情况,我想知道是否可以使用任何火花窗口功能来实现,如果是的话,我有什么想法吗?我认为我举的例子很清楚地解释了这个问题 您可以使用窗口函数,但您必须使用单个分区,这会影响性能。如果您的数据集很小,我建议您使用窗口解决方案。 你可以去看看! ***.com/questions/47643416/… 我的数据集大小约为 3000 行。我没有找到任何 spark 窗口函数(领先、滞后、排名、...)可以解决我的问题。你能在任何建议的函数或示例中帮助我吗 谢谢大家,我会检查@nabongs的答案 【参考方案1】:使用较小的数据集,您可以执行以下操作:
- 使用
when
-otherwise
和withColumn
创建一个新列,该列将采用id
或null
的值,具体取决于flag
的值,在SQL 中相当于:
CASE WHEN FLAG = 'TRUE' THEN ID ELSE NULL END AS NEW_COL
-
然后使用
coalesce
在一个Window上用last
替换所有空值,得到最后一个非空值:
df.show
//+---+-----+
//| id| flag|
//+---+-----+
//| 1|false|
//| 2| true|
//| 3| true|
//| 4| true|
//| 5|false|
//| 6| true|
//| 7| true|
//+---+-----+
//Defining a Window over which we will call the function
import org.apache.spark.sql.expressions.Window
//No partitionBy clause so all the data will move to a single partition
//You'll also get a warning related to that
val w = Window.orderBy($"id")
//The value of `id` will be the same where `flag` is `false`
//last will be called over the window to fill the null values
df.withColumn("new_col" , when($"flag" === lit(false) , $"id").otherwise(null))
.withColumn("new_col" , coalesce($"new_col" , last($"new_col", true).over(w) ) )
.show
//+---+-----+-------+
//|id |flag |new_col|
//+---+-----+-------+
//|1 |false|1 |
//|2 |true |1 |
//|3 |true |1 |
//|4 |true |1 |
//|5 |false|5 |
//|6 |true |5 |
//|7 |true |5 |
//+---+-----+-------+
【讨论】:
非常好的一个 :) 如果你解释一下 coalesce 的作用会有所帮助 不@sandev,他正在使用内置功能spark.apache.org/docs/2.3.0/api/java/org/apache/spark/sql/…,而您正在谈论另一个合并 @RameshMaharjan 谢谢,已添加解释。 如果我们有多个假值一个接一个又一个真值,真值如何取最后一个假值?【参考方案2】:如果你想使用rdd
的方式,那么你可以将所有数据传递给一个执行器,然后执行 for 循环,如下所示
df.rdd.coalesce(1).mapPartitions(iterator =>
var y = "1"
for (x <- iterator) yield
val id = x.getAs[String]("id")
val flag = x.getAs[Boolean]("flag")
if(flag == false)
y = id
newdf(id, flag, y)
else
newdf(id, flag, y)
).toDF()
为此你需要一个案例类
case class newdf(id:String, flag:Boolean, new_id:String)
你也可以不用案例类,但我更喜欢使用案例类
【讨论】:
我可以直接使用 dataset.mapPartitions... 而不使用 coalesce(1) 如果您在集群中进行处理,则不会为您提供正确的结果。完成合并以将数据传输到一个分区中 从性能的角度来看,使用 mapPartition 或上面给出的答案哪个答案更好 测试并分析它们。并接受一个表现更好的人。这一切都取决于你以上是关于如何相对于 spark 中的 col 值在一组行上设置增量 id的主要内容,如果未能解决你的问题,请参考以下文章
如何在一组行之后或有条件地在没有 PL/SQL 块的情况下增加 oracle 序列?