在 Apache Spark 中为每行迭代添加范围变量

Posted

技术标签:

【中文标题】在 Apache Spark 中为每行迭代添加范围变量【英文标题】:Add scoped variable per row iteration in Apache Spark 【发布时间】:2017-07-21 09:03:14 【问题描述】:

我正在将多个 html 文件读入 Spark 中的数据框。 我正在使用自定义 udf 将 html 的元素转换为数据框中的列

val dataset = spark
  .sparkContext
  .wholeTextFiles(inputPath)
  .toDF("filepath", "filecontent")
  .withColumn("biz_name", parseDocValue(".biz-page-title")('filecontent))
  .withColumn("biz_website", parseDocValue(".biz-website a")('filecontent))

  ...

  def parseDocValue(cssSelectorQuery: String) = 
     udf((html: String) => Jsoup.parse(html).select(cssSelectorQuery).text())

效果很好,但是每个withColumn 调用都会导致对html字符串的解析,这是多余的。

有没有办法(不使用查找表等)我可以根据每行的“文件内容”列生成 1 个已解析的文档 (Jsoup.parse(html)),并使其可用于数据框中的所有 withColumn 调用?

或者我不应该尝试使用 DataFrames 而只使用 RDD 吗?

【问题讨论】:

你能用示例文本字符串更新吗? 我在本质上遇到了“wholeTextFiles”的非并行化问题(例如,在我什至可以重新分区之前,64 个核心集群上的 2 个执行程序),所以我可能会重写整个事情。当我解决这个问题时,我会更新并查看建议。很抱歉给您带来不便 你解决了吗? 不,我发现我有一个更大的问题,我必须先解决这个问题。不过,不知道解决方案是否会让我回到这一点。 【参考方案1】:

所以最后的答案其实很简单:

只需映射行并在那里创建对象

def docValue(cssSelectorQuery: String, attr: Option[String] = None)(implicit document: Document): Option[String] = 
    val domObject = document.select(cssSelectorQuery)

    val domValue = attr match 
      case Some(a) => domObject.attr(a)
      case None => domObject.text()
    

    domValue match 
      case x if x == null || x.isEmpty => None
      case y => Some(y)
    
  

 val dataset = spark
      .sparkContext
      .wholeTextFiles(inputPath, minPartitions = 265) 
      .map 
        case (filepath, filecontent) => 
          implicit val document = Jsoup.parse(filecontent)

          val customDataJson = docJson(filecontent, customJsonRegex)


          DataEntry(
            biz_name = docValue(".biz-page-title"),
            biz_website = docValue(".biz-website a"),
            url = docValue("meta[property=og:url]", attr = Some("content")),
            ...
            filename = Some(fileName(filepath)),
            fileTimestamp = Some(fileTimestamp(filepath))
          )
        
      
      .toDS()

【讨论】:

【参考方案2】:

我可能会重写如下,一次性完成解析和选择并将它们放在一个临时列中:

val dataset = spark
  .sparkContext
  .wholeTextFiles(inputPath)
  .withColumn("temp", parseDocValue(Array(".biz-page-title", ".biz-website a"))('filecontent))
  .withColumn("biz_name", col("temp")(0))
  .withColumn("biz_website", col("temp")(1))
  .drop("temp")

def parseDocValue(cssSelectorQueries: Array[String]) =
udf((html: String) => 
  val j = Jsoup.parse(html)
  cssSelectorQueries.map(query => j.select(query).text()))

【讨论】:

以上是关于在 Apache Spark 中为每行迭代添加范围变量的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Apache Spark 中为具有不同结构的两个 DataFrame 实现 NOT IN

apache spark - 迭代地跳过并从 RDD 中获取

在使用 Scala 中的 Spark 写入 JSON 格式之前,在每行前面添加一个新行

Apache Spark:迭代数据帧行并通过 MutableList (Scala) 创建新数据帧

如何在 Spark on YARN 中为 Spark UI 创建安全过滤器

在 Spark 数据框的列中为每个组添加递增的数字