`filter`/`where` 有条件地应用到 Spark `Dataset`/`Dataframe`

Posted

技术标签:

【中文标题】`filter`/`where` 有条件地应用到 Spark `Dataset`/`Dataframe`【英文标题】:Conditional application of `filter`/`where` to a Spark `Dataset`/`Dataframe` 【发布时间】:2017-06-17 11:30:24 【问题描述】:

大家好,我有一个函数可以从一些 S3 位置加载数据集并返回有趣的数据

private def filterBrowseIndex(spark: SparkSession, s3BrowseIndex: String, mids: Seq[String] = Seq(), indices: Seq[String] = Seq()): Dataset[BrowseIndex] = 
import spark.implicits._

spark
  .sparkContext.textFile(s3BrowseIndex)
  // split text dataset
  .map(line => line.split("\\s+"))
  // get types for attributes
  .map(BrowseIndex.strAttributesToBrowseIndex)
  // cast it to a dataset (requires implicit conversions)
  .toDS()
  // pick rows for the given marketplaces
  .where($"mid".isin(mids: _*))
  // pick rows for the given indices
  .where($"index".isin(indices: _*))

如果有人提供mids = Seq()indices = Seq(),此实现将过滤掉所有内容。另一方面,我希望语义是“仅当 mids 不为空时才应用此 where 子句”(indices 相同),这样如果函数的用户提供空序列,则不会发生过滤。

有没有很好的功能性方法来做到这一点?

【问题讨论】:

【参考方案1】:

如果您不介意稍微复杂的逻辑,Raphael Roth 的回答对于应用过滤器的特定问题是一个不错的选择。适用于任何条件转换的通用解决方案(不仅仅是过滤,也不仅仅是在某个决策分支上什么都不做)是使用transform,例如,

spark
  .sparkContext.textFile(s3BrowseIndex)
  // split text dataset
  .map(line => line.split("\\s+"))
  // get types for attributes
  .map(BrowseIndex.strAttributesToBrowseIndex)
  // cast it to a dataset (requires implicit conversions)
  .toDS()
  .transform  ds =>
    // pick rows for the given marketplaces
    if (mids.isEmpty) ds
    else ds.where($"mid".isin(mids: _*))
  
  .transform  ds =>
    // pick rows for the given indices
    if (indices.isEmpty) ds
    else ds.where($"index".isin(indices: _*))
  

如果您使用的是稳定类型的数据集(或数据帧,Dataset[Row]),transform 可能非常有用,因为您可以构建转换函数序列然后应用它们:

transformations.foldLeft(ds)(_ transform _)

在许多情况下,这种方法有助于代码重用和可测试性。

【讨论】:

【参考方案2】:

您可以使用短路评估,这应该仅在提供 Seqs 不为空时应用过滤器:

import org.apache.spark.sql.functions.lit

spark
    .sparkContext.textFile(s3BrowseIndex)
    // split text dataset
    .map(line => line.split("\\s+"))
    // get types for attributes
    .map(BrowseIndex.strAttributesToBrowseIndex)
    // cast it to a dataset (requires implicit conversions)
    .toDS()
    // pick rows for the given marketplaces
    .where(lit(mids.isEmpty) or $"mid".isin(mids: _*))
    // pick rows for the given indices
    .where(lit(indices.isEmpty) or $"index".isin(indices: _*))

【讨论】:

感谢 Raphael,您的解决方案有效。我投了赞成票!我将选择 Sim 的答案作为这个问题的答案,尽管它具有普遍性和更简单的逻辑。

以上是关于`filter`/`where` 有条件地应用到 Spark `Dataset`/`Dataframe`的主要内容,如果未能解决你的问题,请参考以下文章

Laravel 集合 - 多个 where 条件

如何有条件地应用 Linq 运算符?

Oracle执行计划里的 access和filter有什么区别

如何有条件地在 Django 中应用模板过滤器?

在Zend Framework的2 InputFilter中有条件地要求

根据外部值有条件地应用管道步骤