如何将源文件名添加到 Spark 中的每一行?

Posted

技术标签:

【中文标题】如何将源文件名添加到 Spark 中的每一行?【英文标题】:How to add source file name to each row in Spark? 【发布时间】:2015-10-23 01:16:39 【问题描述】:

我是 Spark 的新手,我正在尝试在每个输入行中插入一列,并使用它来自的文件名。

我看到其他人问过类似的问题,但他们的所有答案都使用了wholeTextFile,但我正在尝试对较大的 CSV 文件(使用 Spark-CSV 库读取)、JSON 文件和 Parquet 文件执行此操作(不仅仅是小文本文件)。

我可以使用spark-shell 来获取文件名列表:

val df = sqlContext.read.parquet("/blah/dir")
val names = df.select(inputFileName())
names.show

但这是一个数据框。 我不确定如何将它作为一列添加到每一行(如果该结果的排序与初始数据相同,尽管我认为它总是如此)以及如何将其作为所有输入类型的通用解决方案。

【问题讨论】:

你为什么想要/需要那个? 每条记录都需要显示它最初是哪个文件...当您知道它经过的整个路径时(如格式错误的输入文件),调试起来更容易 【参考方案1】:

我刚刚发现的另一个解决方案是将文件名添加为 DataFrame 中的列之一

val df = sqlContext.read.parquet("/blah/dir")

val dfWithCol = df.withColumn("filename",input_file_name())

参考: spark load data and add filename as dataframe column

【讨论】:

加载文件后.repartition(1)可以得到文件名吗?【参考方案2】:

当您从文本文件创建 RDD 时,您可能希望将数据映射到案例类中,因此您可以在该阶段添加输入源:

case class Person(inputPath: String, name: String, age: Int)
val inputPath = "hdfs://localhost:9000/tmp/demo-input-data/persons.txt"
val rdd = sc.textFile(inputPath).map 
    l =>
      val tokens = l.split(",")
      Person(inputPath, tokens(0), tokens(1).trim().toInt)
  
rdd.collect().foreach(println)

如果您不想将“业务数据”与元数据混在一起:

case class InputSourceMetaData(path: String, size: Long)
case class PersonWithMd(name: String, age: Int, metaData: InputSourceMetaData)

// Fake the size, for demo purposes only
val md = InputSourceMetaData(inputPath, size = -1L)
val rdd = sc.textFile(inputPath).map 
  l =>
    val tokens = l.split(",")
    PersonWithMd(tokens(0), tokens(1).trim().toInt, md)

rdd.collect().foreach(println)

如果你将 RDD 提升为 DataFrame:

import sqlContext.implicits._
val df = rdd.toDF()
df.registerTempTable("x")

你可以这样查询

sqlContext.sql("select name, metadata from x").show()
sqlContext.sql("select name, metadata.path from x").show()
sqlContext.sql("select name, metadata.path, metadata.size from x").show()

更新

您可以使用org.apache.hadoop.fs.FileSystem.listFiles()递归读取HDFS中的文件。

给定值 files(包含 org.apache.hadoop.fs.LocatedFileStatus 的标准 Scala 集合)中的文件名列表,您可以为每个文件创建一个 RDD:

val rdds = files.map  f =>
  val md = InputSourceMetaData(f.getPath.toString, f.getLen)

  sc.textFile(md.path).map 
    l =>
      val tokens = l.split(",")
      PersonWithMd(tokens(0), tokens(1).trim().toInt, md)
  

现在您可以将reduce 的 RDD 列表合并为一个:reduce 的函数将所有 RDD 合并为一个:

val rdd = rdds.reduce(_ ++ _)
rdd.collect().foreach(println)

这可行,但我无法测试这是否能很好地分发/执行大文件。

【讨论】:

我非常欣赏这一点,但唯一的问题是您必须指定输入文件的完整路径和文件名。我只是指定输入目录,拉入位于其中的所有输入文件。 您目前使用的是哪个功能?是wholeTextFiles()吗? 对于 CSV 文件,我使用的是 databricks/spark-csv 库 sqlContext.read.format("com.databricks.spark.csv").load("/path/dir/")。对于镶木地板文件,使用sqlContext.read.parquet("/path/parquetdir/") 我已经更新了显示一般方法的答案。

以上是关于如何将源文件名添加到 Spark 中的每一行?的主要内容,如果未能解决你的问题,请参考以下文章

如何在 Scala/Spark 中为数据框中的每一行编写一个 Json 文件并重命名文件

如何将日期字符串添加到连续写入的日志文件的每一行

如何将 3 个图像添加到 UITableView 的每一行?

Linux:将字数添加到文件的每一行

如何在文本文件中的每一行的开头添加序号

将 Spark Dataframes 的每一行转换为一个字符串,并在 scala 中的每列值之间使用分隔符