使用 Scala/Spark 列出目录中的文件(包括文件信息)

Posted

技术标签:

【中文标题】使用 Scala/Spark 列出目录中的文件(包括文件信息)【英文标题】:List files in directory (including file information) with Scala/Spark 【发布时间】:2021-03-21 06:46:45 【问题描述】:

我对 Scala/Spark 还很陌生,希望你们能帮助我。我想获取在 hdfs 目录中某个时间戳之后创建的文件,以便在 Zeppelin 中进行一些监视。因此我需要一个包含文件名、文件大小和修改日期的列。

我发现这对我来说可以获取我需要的所有信息:

val fs = FileSystem.get(new Configuration())
val dir: String = "some/hdfs/path"
val input_files = fs.listStatus(new Path(dir)).filter(_.getModificationTime> timeInEpoch)

结果我想在 spark 中创建一个 DataFrame,每个文件都有一行及其信息(或至少是上面提到的信息)

val data = sc.parallelize(input_files) 
val dfFromData2 = spark.createDataFrame(data).toDF()

如果我以这种方式尝试,我会得到以下响应:

309: error: overloaded method value createDataFrame with alternatives:
  [A <: Product](data: Seq[A])(implicit evidence$3: reflect.runtime.universe.TypeTag[A])org.apache.spark.sql.DataFrame <and>
  [A <: Product](rdd: org.apache.spark.rdd.RDD[A])(implicit evidence$2: reflect.runtime.universe.TypeTag[A])org.apache.spark.sql.DataFrame
 cannot be applied to (org.apache.spark.rdd.RDD[org.apache.hadoop.fs.FileStatus])
       val dfFromData2 = spark.createDataFrame(data).toDF()

希望你能帮帮我:)

问候

【问题讨论】:

查看这篇文章 - ***.com/questions/61317600/… 【参考方案1】:

如错误消息所示,Hadoop FileStatus 类型不是 Product 的子类型,即元组。 Spark DataFrames 有自己的 SQL 风格的类型系统,它不允许像 FileStatus 这样的任意复杂类型。同样,如果您要尝试对您创建的 RDD 进行操作,您将收到类似的错误,因为 FileStatus 不可序列化。最好的办法是将您需要的数据提取为元组或案例类,并从中创建一个 DataFrame:

case class FileInfo(name : String, modifiedTime : Long, size : Long)

val df = input_files.mapx => 
           FileInfo(x.getPath.toString, x.getModificationTime, x.getLen)
        .toSeq.toDF()

【讨论】:

非常感谢。这正是我一直在寻找的

以上是关于使用 Scala/Spark 列出目录中的文件(包括文件信息)的主要内容,如果未能解决你的问题,请参考以下文章

编程语言-scala:Spark环境搭建

在 Scala / Spark 中通过 CSV 文件中的行有条件地映射以生成另一个 CSV 文件

如何从 Scala Spark 中的 Excel (xls,xlsx) 文件构造数据框?

Azure Datalake Store Gen2 使用 scala spark 库从 Databricks 读取文件

CentOS7+hadoop2.6.4+spark-1.6.1

使用数据框的子集和 spark/scala 中的两个特定字段过滤数据框 [关闭]