在 Apache Spark 中为每行迭代添加范围变量
Posted
技术标签:
【中文标题】在 Apache Spark 中为每行迭代添加范围变量【英文标题】:Add scoped variable per row iteration in Apache Spark 【发布时间】:2017-07-21 09:03:14 【问题描述】:我正在将多个 html 文件读入 Spark 中的数据框。 我正在使用自定义 udf 将 html 的元素转换为数据框中的列
val dataset = spark
.sparkContext
.wholeTextFiles(inputPath)
.toDF("filepath", "filecontent")
.withColumn("biz_name", parseDocValue(".biz-page-title")('filecontent))
.withColumn("biz_website", parseDocValue(".biz-website a")('filecontent))
...
def parseDocValue(cssSelectorQuery: String) =
udf((html: String) => Jsoup.parse(html).select(cssSelectorQuery).text())
效果很好,但是每个withColumn
调用都会导致对html字符串的解析,这是多余的。
有没有办法(不使用查找表等)我可以根据每行的“文件内容”列生成 1 个已解析的文档 (Jsoup.parse(html)
),并使其可用于数据框中的所有 withColumn
调用?
或者我不应该尝试使用 DataFrames 而只使用 RDD 吗?
【问题讨论】:
你能用示例文本字符串更新吗? 我在本质上遇到了“wholeTextFiles”的非并行化问题(例如,在我什至可以重新分区之前,64 个核心集群上的 2 个执行程序),所以我可能会重写整个事情。当我解决这个问题时,我会更新并查看建议。很抱歉给您带来不便 你解决了吗? 不,我发现我有一个更大的问题,我必须先解决这个问题。不过,不知道解决方案是否会让我回到这一点。 【参考方案1】:所以最后的答案其实很简单:
只需映射行并在那里创建对象
def docValue(cssSelectorQuery: String, attr: Option[String] = None)(implicit document: Document): Option[String] =
val domObject = document.select(cssSelectorQuery)
val domValue = attr match
case Some(a) => domObject.attr(a)
case None => domObject.text()
domValue match
case x if x == null || x.isEmpty => None
case y => Some(y)
val dataset = spark
.sparkContext
.wholeTextFiles(inputPath, minPartitions = 265)
.map
case (filepath, filecontent) =>
implicit val document = Jsoup.parse(filecontent)
val customDataJson = docJson(filecontent, customJsonRegex)
DataEntry(
biz_name = docValue(".biz-page-title"),
biz_website = docValue(".biz-website a"),
url = docValue("meta[property=og:url]", attr = Some("content")),
...
filename = Some(fileName(filepath)),
fileTimestamp = Some(fileTimestamp(filepath))
)
.toDS()
【讨论】:
【参考方案2】:我可能会重写如下,一次性完成解析和选择并将它们放在一个临时列中:
val dataset = spark
.sparkContext
.wholeTextFiles(inputPath)
.withColumn("temp", parseDocValue(Array(".biz-page-title", ".biz-website a"))('filecontent))
.withColumn("biz_name", col("temp")(0))
.withColumn("biz_website", col("temp")(1))
.drop("temp")
def parseDocValue(cssSelectorQueries: Array[String]) =
udf((html: String) =>
val j = Jsoup.parse(html)
cssSelectorQueries.map(query => j.select(query).text()))
【讨论】:
以上是关于在 Apache Spark 中为每行迭代添加范围变量的主要内容,如果未能解决你的问题,请参考以下文章
如何在 Apache Spark 中为具有不同结构的两个 DataFrame 实现 NOT IN
apache spark - 迭代地跳过并从 RDD 中获取
在使用 Scala 中的 Spark 写入 JSON 格式之前,在每行前面添加一个新行
Apache Spark:迭代数据帧行并通过 MutableList (Scala) 创建新数据帧