spark根据新行删除前一行,条件匹配

Posted

技术标签:

【中文标题】spark根据新行删除前一行,条件匹配【英文标题】:spark delete previous row based on the new row with some conditions match 【发布时间】:2018-10-13 14:19:43 【问题描述】:

我有如下数据框

type   f1   f2  value 

1      a    xy    11

2      b    ab    13

3      c    na    16

3      c    dir    18

3      c    ls    23

我必须删除上一行,某些条件与下一行匹配,

例如从上表中,当 type == type(row-1) && f1 == f1(row-1) && abs(value - value (row-1))

所以我的桌子应该如下所示

type   f1   f2  value 

1      a    xy    11

2      b    ab    13

3      c    dir   18

3      c    ls    30 

我认为我们可以利用滞后或领先特征,但无法获得准确的逻辑

【问题讨论】:

您可能必须使用用户定义的函数并添加一个新列,如果条件匹配则为 1,否则为 0,然后过滤该列。尽管要注意 udf 本质上比原生 spark 函数慢 问题是数据是按类型排序的,你想消除类型之间的重复还是只在下一行? 【参考方案1】:

是的,可以使用.lead() 完成

import org.apache.spark.sql.expressions._
//define window specification
val windowSpec = Window.partitionBy($"type",$"f1").orderBy($"type")

val inputDF = sc.parallelize(List((1,"a","xy",11),(2,"b","ab",13),(3,"c","na",16),(3,"c","dir",18),(3,"c","ls",23))).toDF("type","f1","f2","value")

inputDF.withColumn("leadValue",lead($"value",1).over(windowSpec))
  .withColumn("result", when(abs($"leadValue" - $"value") <= 2, 1).otherwise(0)) //check for condition
  .filter($"result" === 0)      //filter the rows
  .drop("leadValue","result") //remove additional columns
  .orderBy($"type")
  .show

输出:

+----+---+---+-----+
|type| f1| f2|value|
+----+---+---+-----+
|   1|  a| xy|   11|
|   2|  b| ab|   13|
|   3|  c|dir|   18|
|   3|  c| ls|   23|
+----+---+---+-----+

由于我们已经按typef1 进行分区,我们不需要检查它们的相等条件

【讨论】:

以上是关于spark根据新行删除前一行,条件匹配的主要内容,如果未能解决你的问题,请参考以下文章

将新行与spark scala中数据框中的前一行数据合并

在 PySpark 中的窗口上获取与某些条件匹配的第一行

如果字符串匹配,如何删除一行

如果一行匹配条件,则 sed 替换与模式范围匹配的行

正则表达式:匹配模式后跟一个空格但不匹配2个或更多空格或EOF

如果 ID 匹配在其他列中插入新行和总值