根据火花数据框scala中的列值过滤行

Posted

技术标签:

【中文标题】根据火花数据框scala中的列值过滤行【英文标题】:Filtering rows based on column values in spark dataframe scala 【发布时间】:2016-04-02 15:13:50 【问题描述】:

我有一个数据框(火花):

id  value 
3     0
3     1
3     0
4     1
4     0
4     0

我想创建一个新的数据框:

3 0
3 1
4 1

需要为每个 id 删除 1(value) 之后的所有行。我尝试使用 spark dateframe(Scala) 中的窗口函数。但无法找到解决方案。似乎我走错了方向。

我正在寻找 Scala 中的解决方案。谢谢

使用 monotonically_increasing_id 输出

 scala> val data = Seq((3,0),(3,1),(3,0),(4,1),(4,0),(4,0)).toDF("id", "value")
data: org.apache.spark.sql.DataFrame = [id: int, value: int]

scala> val minIdx = dataWithIndex.filter($"value" === 1).groupBy($"id").agg(min($"idx")).toDF("r_id", "min_idx")
minIdx: org.apache.spark.sql.DataFrame = [r_id: int, min_idx: bigint]

scala> dataWithIndex.join(minIdx,($"r_id" === $"id") && ($"idx" <= $"min_idx")).select($"id", $"value").show
+---+-----+
| id|value|
+---+-----+
|  3|    0|
|  3|    1|
|  4|    1|
+---+-----+

如果我们在原始数据框中进行排序转换,该解决方案将不起作用。那个时候monotonically_increasing_id()是基于原始DF而不是排序DF生成的。我之前错过了这个要求。

欢迎所有建议。

【问题讨论】:

到目前为止你尝试了什么? @eliasah 我根据***.com/questions/32148208/… 的答案尝试了一些实验。但到目前为止没有成功 你的 DF 排序了吗? @TheArchetypalPaul 是的,它已排序 因为你每次都调用show。在我下面的代码中,评估是懒惰的——原始的val dataWithIndex 仅在我的最终show 被调用时才被评估。但是你每次都打电话show,迫使重新评估。停止调用show,或创建dataWithIndex后立即调用cache 【参考方案1】:

一种方法是使用monotonically_increasing_id() 和自联接:

val data = Seq((3,0),(3,1),(3,0),(4,1),(4,0),(4,0)).toDF("id", "value")
data.show
+---+-----+
| id|value|
+---+-----+
|  3|    0|
|  3|    1|
|  3|    0|
|  4|    1|
|  4|    0|
|  4|    0|
+---+-----+

现在我们生成一个名为idx 的列,Long 增加:

val dataWithIndex = data.withColumn("idx", monotonically_increasing_id())
// dataWithIndex.cache()

现在我们得到每个idmin(idx),其中value = 1

val minIdx = dataWithIndex
               .filter($"value" === 1)
               .groupBy($"id")
               .agg(min($"idx"))
               .toDF("r_id", "min_idx")

现在我们加入min(idx)回到原来的DataFrame

dataWithIndex.join(
  minIdx,
  ($"r_id" === $"id") && ($"idx" <= $"min_idx")
).select($"id", $"value").show
+---+-----+
| id|value|
+---+-----+
|  3|    0|
|  3|    1|
|  4|    1|
+---+-----+

注意: monotonically_increasing_id() 根据行的分区生成其值。每次重新评估 dataWithIndex 时,此值可能会发生变化。在我上面的代码中,由于延迟评估,只有当我调用最终的 show 时,才会评估 monotonically_increasing_id()

如果您想强制该值保持不变,例如,您可以使用show 逐步评估上述内容,请取消注释上面的这一行:

//  dataWithIndex.cache()

【讨论】:

是的,不要对monotonically_increasing_id() 生成的列看得太深——你每次看它时可能会得到不同的值——你看到的数字是基于分区方案的。只运行代码,不要看中间值。它有效。 如果为了理智起见,您希望每次都看到相同的值 - 添加行 dataWithIndex.cache()。但这并不会改变整体结果——它只是让您可以在显微镜下观察每一步,而不会觉得自己发疯了。 感谢@davidGirffin。我没有得到正确的输出,那是我检查了中间结果。我已经更新了问题本身的输出。请你看看。 已答复。这是因为每次您拨打show 时都会强制重新评估。这就像量子力学——你通过观察它来改变价值。如果你像我一样运行代码——只有最后一个show——它会得到正确的结果。【参考方案2】:

您好,我找到了使用 Window 和 self join 的解决方案。

val data = Seq((3,0,2),(3,1,3),(3,0,1),(4,1,6),(4,0,5),(4,0,4),(1,0,7),(1,1,8),(1,0,9),(2,1,10),(2,0,11),(2,0,12)).toDF("id", "value","sorted")

data.show

scala> data.show
+---+-----+------+
| id|value|sorted|
+---+-----+------+
|  3|    0|     2|
|  3|    1|     3|
|  3|    0|     1|
|  4|    1|     6|
|  4|    0|     5|
|  4|    0|     4|
|  1|    0|     7|
|  1|    1|     8|
|  1|    0|     9|
|  2|    1|    10|
|  2|    0|    11|
|  2|    0|    12|
+---+-----+------+




val sort_df=data.sort($"sorted")

scala> sort_df.show
+---+-----+------+
| id|value|sorted|
+---+-----+------+
|  3|    0|     1|
|  3|    0|     2|
|  3|    1|     3|
|  4|    0|     4|
|  4|    0|     5|
|  4|    1|     6|
|  1|    0|     7|
|  1|    1|     8|
|  1|    0|     9|
|  2|    1|    10|
|  2|    0|    11|
|  2|    0|    12|
+---+-----+------+



var window=Window.partitionBy("id").orderBy("$sorted")

 val sort_idx=sort_df.select($"*",rowNumber.over(window).as("count_index"))

val minIdx=sort_idx.filter($"value"===1).groupBy("id").agg(min("count_index")).toDF("idx","min_idx")

val result_id=sort_idx.join(minIdx,($"id"===$"idx") &&($"count_index" <= $"min_idx"))

result_id.show

+---+-----+------+-----------+---+-------+
| id|value|sorted|count_index|idx|min_idx|
+---+-----+------+-----------+---+-------+
|  1|    0|     7|          1|  1|      2|
|  1|    1|     8|          2|  1|      2|
|  2|    1|    10|          1|  2|      1|
|  3|    0|     1|          1|  3|      3|
|  3|    0|     2|          2|  3|      3|
|  3|    1|     3|          3|  3|      3|
|  4|    0|     4|          1|  4|      3|
|  4|    0|     5|          2|  4|      3|
|  4|    1|     6|          3|  4|      3|
+---+-----+------+-----------+---+-------+

仍在寻找更优化的解决方案。谢谢

【讨论】:

【参考方案3】:

您可以像这样简单地使用groupBy

val df2 = df1.groupBy("id","value").count().select("id","value")

你的df1在这里

id  value 
3     0
3     1
3     0
4     1
4     0
4     0

结果数据框是df2,这是您的预期输出

id  value 
3     0
3     1
4     1
4     0

【讨论】:

【参考方案4】:
use isin method and filter as below:

val data = Seq((3,0,2),(3,1,3),(3,0,1),(4,1,6),(4,0,5),(4,0,4),(1,0,7),(1,1,8),(1,0,9),(2,1,10),(2,0,11),(2,0,12)).toDF("id", "value","sorted")
val idFilter = List(1, 2)
 data.filter($"id".isin(idFilter:_*)).show
+---+-----+------+
| id|value|sorted|
+---+-----+------+
|  1|    0|     7|
|  1|    1|     8|
|  1|    0|     9|
|  2|    1|    10|
|  2|    0|    11|
|  2|    0|    12|
+---+-----+------+

Ex: filter based on val
val valFilter = List(0)
data.filter($"value".isin(valFilter:_*)).show
+---+-----+------+
| id|value|sorted|
+---+-----+------+
|  3|    0|     2|
|  3|    0|     1|
|  4|    0|     5|
|  4|    0|     4|
|  1|    0|     7|
|  1|    0|     9|
|  2|    0|    11|
|  2|    0|    12|
+---+-----+------+

【讨论】:

以上是关于根据火花数据框scala中的列值过滤行的主要内容,如果未能解决你的问题,请参考以下文章

按 R 中的列值过滤列表中的每个数据框

折叠火花数据框中的列值

如果数据帧基于列值上的过滤器,则从字典中提取行数据

在python中过滤与列表值匹配的列值的数据框[重复]

将行值转换为火花数据框中的列数组

使用 scala 使用布尔运算折叠火花数据框中的列