获取被筛选器从 spark 数据帧中删除的行的示例
Posted
技术标签:
【中文标题】获取被筛选器从 spark 数据帧中删除的行的示例【英文标题】:Get examples for rows that are removed by a filter from a spark dataframe 【发布时间】:2018-07-05 08:13:07 【问题描述】:假设我有一个带有一些列 (id,...) 的 spark 数据框 df
和一个带有 SQL 过滤器的字符串 sqlFilter
,例如"id is not null"
。
我想根据sqlFilter
过滤数据框df
,即
val filtered = df.filter(sqlFilter)
现在,我想要一个列表,其中包含来自 df
的 10 个被过滤器删除的 id。
目前,我正在使用“leftanti”连接来实现这一点,即
val examples = df.select("id").join(filtered.select("id"), Seq("id"), "leftanti")
.take(10)
.map(row => Option(row.get(0)) match case None => "null" case Some(x) => x.toString)
但是,这真的很慢。 我的猜测是这可以更快地实现,因为 spark 只需要每个分区都有一个列表 当过滤器删除一行并且列表包含少于 10 个元素时,将 id 添加到列表中。一旦行动后 过滤完成后,spark 必须从分区中收集所有列表,直到它有 10 个 id。
我想使用here 中描述的累加器,
但我失败了,因为我不知道如何解析和使用sqlFilter
。
有人知道如何提高性能吗?
更新 Ramesh Maharjan 在 cmets 中建议逆 SQL 查询,即
df.filter(s"NOT ($filterString)")
.select(key)
.take(10)
.map(row => Option(row.get(0)) match case None => "null" case Some(x) => x.toString)
这确实提高了性能,但不是 100% 等效的。
如果有多行具有相同的 id,如果由于过滤器而删除了一行,则该 id 将最终出现在示例中。使用 leftantit 连接后,它不会出现在示例中,因为 id 仍在 filtered
中。
不过,这对我来说很好。
如果可以使用累加器或类似的东西“即时”创建列表,我仍然很感兴趣。
更新 2
反转过滤器的另一个问题是 SQL 中的逻辑值 UNKNOWN,因为 NOT UNKNWON = UNKNOWN,即NOT(null <> 1) <=> UNKNOWN
,因此该行既不会出现在过滤的数据帧中,也不会出现在反转的数据帧中。
【问题讨论】:
只需反转 sql 查询并选择 10 个 ids 。它应该更快 【参考方案1】:您可以使用自定义累加器(因为 longAccumulator
不会帮助您,因为所有 id 都将为空);并且您必须将您的过滤器语句表述为函数:
假设你有一个数据框:
+----+--------+
| id| name|
+----+--------+
| 1|record 1|
|null|record 2|
| 3|record 3|
+----+--------+
那么你可以这样做:
import org.apache.spark.util.AccumulatorV2
class RowAccumulator(var value: Seq[Row]) extends AccumulatorV2[Row, Seq[Row]]
def this() = this(Seq.empty[Row])
override def isZero: Boolean = value.isEmpty
override def copy(): AccumulatorV2[Row, Seq[Row]] = new RowAccumulator(value)
override def reset(): Unit = value = Seq.empty[Row]
override def add(v: Row): Unit = value = value :+ v
override def merge(other: AccumulatorV2[Row, Seq[Row]]): Unit = value = value ++ other.value
val filteredAccum = new RowAccumulator()
ss.sparkContext.register(filteredAccum, "Filter Accum")
val filterIdIsNotNull = (r:Row) =>
if(r.isNullAt(r.fieldIndex("id")))
filteredAccum.add(r)
false
else
true
df
.filter(filterIdIsNotNull)
.show()
println(filteredAccum.value)
给了
+---+--------+
| id| name|
+---+--------+
| 1|record 1|
| 3|record 3|
+---+--------+
List([null,record 2])
但我个人不会这样做,我宁愿做你已经建议的事情:
val dfWithFilter = df
.withColumn("keep",expr("id is not null"))
.cache() // check whether caching is feasibly
// show 10 records which we do not keep
dfWithFilter.filter(!$"keep").drop($"keep").show(10) // or use take(10)
+----+--------+
| id| name|
+----+--------+
|null|record 2|
+----+--------+
// rows that we keep
val filteredDf = dfWithFilter.filter($"keep").drop($"keep")
【讨论】:
我的想法是使用我自己的CollectionAccumulator
版本来存储ID。但是,我选择了反向过滤器(keep
列是个好主意),我同意这个解决方案更好,因为它更容易理解和维护。我只是好奇是否可以使用蓄电池,但我想念如何去做。我不能使用 scala 函数,因为过滤器是由业务用户提供的,他们希望为过滤后的行提供示例,以便能够查看源数据并找出数据看起来不像预期的原因。以上是关于获取被筛选器从 spark 数据帧中删除的行的示例的主要内容,如果未能解决你的问题,请参考以下文章