如何相对于 spark 中的 col 值在一组行上设置增量 id

Posted

技术标签:

【中文标题】如何相对于 spark 中的 col 值在一组行上设置增量 id【英文标题】:how to set a increment id over set of rows with respect to a col value in spark 【发布时间】:2018-04-27 08:43:41 【问题描述】:

您好,我有一个数据集如下所示:

我的意见:

+----------+----------------+
|  id      |  flag          |
+----------+----------------|
|  1       | false          |  
+----------+----------------|
|  2       | true           | 
+----------+----------------|
|  3       | false          |
+----------+----------------|
|  4       | true           |  
+----------+----------------|
|  5       | false          |
+----------+----------------|
|  6       | false          |  
+----------+----------------|
|  7       | true           |
+----------+----------------+

输出:

+----------+----------------+----------------------------+
|  id      |  flag          |  new_col                   |
+----------+---------------------------------------------+
|  1       | false          |      1                     |
+----------+---------------------------------------------+
|  2       | true           |      1                     |
+----------+----------------+----------------------------+
|  3       | false          |      3                     |
+----------+----------------+----------------------------+
|  4       | true           |      3                     |
+----------+----------------+----------------------------+
|  5       | false          |      5                     |
+----------+----------------+----------------------------+
|  6       | false          |      6                     |
+----------+----------------+----------------------------+
|  7       | true           |      6                     |
+----------+----------------+----------------------------+

每个 false 值都会将 new_col 值更改为其 id 等等... 有什么帮助吗?

【问题讨论】:

这是我的情况,我想知道是否可以使用任何火花窗口功能来实现,如果是的话,我有什么想法吗?我认为我举的例子很清楚地解释了这个问题 您可以使用窗口函数,但您必须使用单个分区,这会影响性能。如果您的数据集很小,我建议您使用窗口解决方案。 你可以去看看! ***.com/questions/47643416/… 我的数据集大小约为 3000 行。我没有找到任何 spark 窗口函数(领先、滞后、排名、...)可以解决我的问题。你能在任何建议的函数或示例中帮助我吗 谢谢大家,我会检查@nabongs的答案 【参考方案1】:

使用较小的数据集,您可以执行以下操作:

    使用when-otherwisewithColumn 创建一个新列,该列将采用idnull 的值,具体取决于flag 的值,在SQL 中相当于:
CASE WHEN FLAG = 'TRUE' THEN ID ELSE NULL END AS NEW_COL
    然后使用coalesce在一个Window上用last替换所有空值,得到最后一个非空值:
df.show
//+---+-----+
//| id| flag|
//+---+-----+
//|  1|false|
//|  2| true|
//|  3| true|
//|  4| true|
//|  5|false|
//|  6| true|
//|  7| true|
//+---+-----+

//Defining a Window over which we will call the function
import org.apache.spark.sql.expressions.Window

//No partitionBy clause so all the data will move to a single partition
//You'll also get a warning related to that
val w = Window.orderBy($"id")

//The value of `id` will be the same where `flag` is `false`
//last will be called over the window to fill the null values   
df.withColumn("new_col" , when($"flag" === lit(false) , $"id").otherwise(null))
  .withColumn("new_col" , coalesce($"new_col" , last($"new_col", true).over(w) ) )
  .show   
//+---+-----+-------+
//|id |flag |new_col|
//+---+-----+-------+
//|1  |false|1      |
//|2  |true |1      |
//|3  |true |1      |
//|4  |true |1      |
//|5  |false|5      |
//|6  |true |5      |
//|7  |true |5      |
//+---+-----+-------+

【讨论】:

非常好的一个 :) 如果你解释一下 coalesce 的作用会有所帮助 不@sandev,他正在使用内置功能spark.apache.org/docs/2.3.0/api/java/org/apache/spark/sql/…,而您正在谈论另一个合并 @RameshMaharjan 谢谢,已添加解释。 如果我们有多个假值一个接一个又一个真值,真值如何取最后一个假值?【参考方案2】:

如果你想使用rdd 的方式,那么你可以将所有数据传递给一个执行器,然后执行 for 循环,如下所示

df.rdd.coalesce(1).mapPartitions(iterator => 
  var y = "1"
  for (x <- iterator) yield 
    val id = x.getAs[String]("id")
    val flag = x.getAs[Boolean]("flag")
    if(flag == false)
      y = id
      newdf(id, flag, y)
    else
      newdf(id, flag, y)
    
  
).toDF()

为此你需要一个案例类

case class newdf(id:String, flag:Boolean, new_id:String)

也可以不用案例类,但我更喜欢使用案例类

【讨论】:

我可以直接使用 dataset.mapPartitions... 而不使用 coalesce(1) 如果您在集群中进行处理,则不会为您提供正确的结果。完成合并以将数据传输到一个分区中 从性能的角度来看,使用 mapPartition 或上面给出的答案哪个答案更好 测试并分析它们。并接受一个表现更好的人。这一切都取决于你

以上是关于如何相对于 spark 中的 col 值在一组行上设置增量 id的主要内容,如果未能解决你的问题,请参考以下文章

如何在一组行之后或有条件地在没有 PL/SQL 块的情况下增加 oracle 序列?

如果列中的值在一组值列表中,则过滤数据框行[重复]

调整相对于 2 列的选择以在一列中返回最大值以用于另一列中的重复条目

如何根据列中的一组行对数据框进行排名?

如何填充一列以区分 Impala 组中的一组行与其他行?

r - 检查向量上的每个值在一组区​​域上的次数