`filter`/`where` 有条件地应用到 Spark `Dataset`/`Dataframe`
Posted
技术标签:
【中文标题】`filter`/`where` 有条件地应用到 Spark `Dataset`/`Dataframe`【英文标题】:Conditional application of `filter`/`where` to a Spark `Dataset`/`Dataframe` 【发布时间】:2017-06-17 11:30:24 【问题描述】:大家好,我有一个函数可以从一些 S3 位置加载数据集并返回有趣的数据
private def filterBrowseIndex(spark: SparkSession, s3BrowseIndex: String, mids: Seq[String] = Seq(), indices: Seq[String] = Seq()): Dataset[BrowseIndex] =
import spark.implicits._
spark
.sparkContext.textFile(s3BrowseIndex)
// split text dataset
.map(line => line.split("\\s+"))
// get types for attributes
.map(BrowseIndex.strAttributesToBrowseIndex)
// cast it to a dataset (requires implicit conversions)
.toDS()
// pick rows for the given marketplaces
.where($"mid".isin(mids: _*))
// pick rows for the given indices
.where($"index".isin(indices: _*))
如果有人提供mids = Seq()
或indices = Seq()
,此实现将过滤掉所有内容。另一方面,我希望语义是“仅当 mids
不为空时才应用此 where 子句”(indices
相同),这样如果函数的用户提供空序列,则不会发生过滤。
有没有很好的功能性方法来做到这一点?
【问题讨论】:
【参考方案1】:如果您不介意稍微复杂的逻辑,Raphael Roth 的回答对于应用过滤器的特定问题是一个不错的选择。适用于任何条件转换的通用解决方案(不仅仅是过滤,也不仅仅是在某个决策分支上什么都不做)是使用transform
,例如,
spark
.sparkContext.textFile(s3BrowseIndex)
// split text dataset
.map(line => line.split("\\s+"))
// get types for attributes
.map(BrowseIndex.strAttributesToBrowseIndex)
// cast it to a dataset (requires implicit conversions)
.toDS()
.transform ds =>
// pick rows for the given marketplaces
if (mids.isEmpty) ds
else ds.where($"mid".isin(mids: _*))
.transform ds =>
// pick rows for the given indices
if (indices.isEmpty) ds
else ds.where($"index".isin(indices: _*))
如果您使用的是稳定类型的数据集(或数据帧,Dataset[Row]
),transform
可能非常有用,因为您可以构建转换函数序列然后应用它们:
transformations.foldLeft(ds)(_ transform _)
在许多情况下,这种方法有助于代码重用和可测试性。
【讨论】:
【参考方案2】:您可以使用短路评估,这应该仅在提供 Seq
s 不为空时应用过滤器:
import org.apache.spark.sql.functions.lit
spark
.sparkContext.textFile(s3BrowseIndex)
// split text dataset
.map(line => line.split("\\s+"))
// get types for attributes
.map(BrowseIndex.strAttributesToBrowseIndex)
// cast it to a dataset (requires implicit conversions)
.toDS()
// pick rows for the given marketplaces
.where(lit(mids.isEmpty) or $"mid".isin(mids: _*))
// pick rows for the given indices
.where(lit(indices.isEmpty) or $"index".isin(indices: _*))
【讨论】:
感谢 Raphael,您的解决方案有效。我投了赞成票!我将选择 Sim 的答案作为这个问题的答案,尽管它具有普遍性和更简单的逻辑。以上是关于`filter`/`where` 有条件地应用到 Spark `Dataset`/`Dataframe`的主要内容,如果未能解决你的问题,请参考以下文章
Oracle执行计划里的 access和filter有什么区别