获取被筛选器从 spark 数据帧中删除的行的示例

Posted

技术标签:

【中文标题】获取被筛选器从 spark 数据帧中删除的行的示例【英文标题】:Get examples for rows that are removed by a filter from a spark dataframe 【发布时间】:2018-07-05 08:13:07 【问题描述】:

假设我有一个带有一些列 (id,...) 的 spark 数据框 df 和一个带有 SQL 过滤器的字符串 sqlFilter,例如"id is not null"。 我想根据sqlFilter过滤数据框df,即

val filtered = df.filter(sqlFilter)

现在,我想要一个列表,其中包含来自 df 的 10 个被过滤器删除的 id。

目前,我正在使用“leftanti”连接来实现这一点,即

val examples = df.select("id").join(filtered.select("id"), Seq("id"), "leftanti")
                 .take(10)
                 .map(row => Option(row.get(0)) match  case None => "null" case Some(x) => x.toString)

但是,这真的很慢。 我的猜测是这可以更快地实现,因为 spark 只需要每个分区都有一个列表 当过滤器删除一行并且列表包含少于 10 个元素时,将 id 添加到列表中。一旦行动后 过滤完成后,spark 必须从分区中收集所有列表,直到它有 10 个 id。

我想使用here 中描述的累加器, 但我失败了,因为我不知道如何解析和使用sqlFilter

有人知道如何提高性能吗?

更新 Ramesh Maharjan 在 cmets 中建议逆 SQL 查询,即

df.filter(s"NOT ($filterString)")
          .select(key)
          .take(10)
          .map(row => Option(row.get(0)) match  case None => "null" case Some(x) => x.toString)

这确实提高了性能,但不是 100% 等效的。 如果有多行具有相同的 id,如果由于过滤器而删除了一行,则该 id 将最终出现在示例中。使用 leftantit 连接后,它不会出现在示例中,因为 id 仍在 filtered 中。 不过,这对我来说很好。

如果可以使用累加器或类似的东西“即时”创建列表,我仍然很感兴趣。

更新 2

反转过滤器的另一个问题是 SQL 中的逻辑值 UNKNOWN,因为 NOT UNKNWON = UNKNOWN,即NOT(null <> 1) <=> UNKNOWN,因此该行既不会出现在过滤的数据帧中,也不会出现在反转的数据帧中。

【问题讨论】:

只需反转 sql 查询并选择 10 个 ids 。它应该更快 【参考方案1】:

您可以使用自定义累加器(因为 longAccumulator 不会帮助您,因为所有 id 都将为空);并且您必须将您的过滤器语句表述为函数:

假设你有一个数据框:

+----+--------+
|  id|    name|
+----+--------+
|   1|record 1|
|null|record 2|
|   3|record 3|
+----+--------+

那么你可以这样做:

import org.apache.spark.util.AccumulatorV2

class RowAccumulator(var value: Seq[Row]) extends AccumulatorV2[Row, Seq[Row]] 
  def this() = this(Seq.empty[Row])
  override def isZero: Boolean = value.isEmpty
  override def copy(): AccumulatorV2[Row, Seq[Row]] = new RowAccumulator(value)
  override def reset(): Unit = value = Seq.empty[Row]
  override def add(v: Row): Unit = value = value :+ v
  override def merge(other: AccumulatorV2[Row, Seq[Row]]): Unit = value = value ++ other.value


val filteredAccum = new RowAccumulator()
ss.sparkContext.register(filteredAccum, "Filter Accum")

val filterIdIsNotNull = (r:Row) => 
  if(r.isNullAt(r.fieldIndex("id"))) 
    filteredAccum.add(r)
    false
   else 
    true
  

df
  .filter(filterIdIsNotNull)
  .show()

println(filteredAccum.value)

给了

+---+--------+
| id|    name|
+---+--------+
|  1|record 1|
|  3|record 3|
+---+--------+

List([null,record 2])

但我个人不会这样做,我宁愿做你已经建议的事情:

val dfWithFilter = df
  .withColumn("keep",expr("id is not null"))
  .cache() // check whether caching is feasibly

// show 10 records which we do not keep
dfWithFilter.filter(!$"keep").drop($"keep").show(10) // or use take(10)

+----+--------+
|  id|    name|
+----+--------+
|null|record 2|
+----+--------+

// rows that we keep
val filteredDf = dfWithFilter.filter($"keep").drop($"keep")

【讨论】:

我的想法是使用我自己的CollectionAccumulator 版本来存储ID。但是,我选择了反向过滤器(keep 列是个好主意),我同意这个解决方案更好,因为它更容易理解和维护。我只是好奇是否可以使用蓄电池,但我想念如何去做。我不能使用 scala 函数,因为过滤器是由业务用户提供的,他们希望为过滤后的行提供示例,以便能够查看源数据并找出数据看起来不像预期的原因。

以上是关于获取被筛选器从 spark 数据帧中删除的行的示例的主要内容,如果未能解决你的问题,请参考以下文章

在 spark scala 中为数据帧中的每个组采样不同数量的随机行

C#中的DataTable怎么获取已删除行的信息

获取具有特定数量的重复值的行

删除Spark数据帧中具有句点的列名称

SparkSQL在分组后从数据帧中获取之前和之后的行

spark:合并两个数据帧,如果两个数据帧中的ID重复,则df1中的行覆盖df2中的行