Spark迭代HDFS目录

Posted

技术标签:

【中文标题】Spark迭代HDFS目录【英文标题】:Spark iterate HDFS directory 【发布时间】:2015-01-17 09:06:54 【问题描述】:

我在 HDFS 上有一个目录目录,我想遍历这些目录。使用 SparkContext 对象是否有任何简单的方法可以通过 Spark 做到这一点?

【问题讨论】:

您的意思是“迭代”,例如获取其中的子目录和文件列表?还是获取所有子目录中的所有文件? 按照列出所有子目录的方式进行迭代。每个子目录都包含一堆我想以不同方式处理的文本文件。 【参考方案1】:

您可以使用org.apache.hadoop.fs.FileSystem。具体来说,FileSystem.listFiles([path], true)

还有 Spark...

FileSystem.get(sc.hadoopConfiguration).listFiles(..., true)

编辑

值得注意的是,良好的做法是获取与Path 的方案相关联的FileSystem

path.getFileSystem(sc.hadoopConfiguration).listFiles(path, true)

【讨论】:

真的很好! I had this question,授予,我想这在最初的 spark-submit 调用中不起作用 如何使用创建的 RemoteIterator 创建文件列表?【参考方案2】:

如果有人感兴趣,这里是 PySpark 版本:

    hadoop = sc._jvm.org.apache.hadoop

    fs = hadoop.fs.FileSystem
    conf = hadoop.conf.Configuration() 
    path = hadoop.fs.Path('/hivewarehouse/disc_mrt.db/unified_fact/')

    for f in fs.get(conf).listStatus(path):
        print(f.getPath(), f.getLen())

在这种特殊情况下,我得到了构成 disc_mrt.unified_fact Hive 表的所有文件的列表。

FileStatus 对象的其他方法,如获取文件大小的 getLen() 描述如下:

Class FileStatus

【讨论】:

赞成 pyspark 版本!工作完美!谢谢! 谢谢你!想知道如何在python中制作它【参考方案3】:
import  org.apache.hadoop.fs.FileSystem,Path

FileSystem.get( sc.hadoopConfiguration ).listStatus( new Path("hdfs:///tmp")).foreach( x => println(x.getPath ))

这对我有用。

Spark 版本 1.5.0-cdh5.5.2

【讨论】:

这对我来说很好,对于单个文件夹。有没有办法让它在父文件夹级别运行,并获取所有子文件夹中的所有文件?这对我来说非常有帮助/有用。【参考方案4】:

这对我有用

FileSystem.get(new URI("hdfs://HAservice:9000"), sc.hadoopConfiguration).listStatus( new Path("/tmp/")).foreach( x => println(x.getPath ))

【讨论】:

【参考方案5】:

@Tagar 没有说如何连接远程 hdfs,但this answer 说了:

URI           = sc._gateway.jvm.java.net.URI
Path          = sc._gateway.jvm.org.apache.hadoop.fs.Path
FileSystem    = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem
Configuration = sc._gateway.jvm.org.apache.hadoop.conf.Configuration


fs = FileSystem.get(URI("hdfs://somehost:8020"), Configuration())

status = fs.listStatus(Path('/some_dir/yet_another_one_dir/'))

for fileStatus in status:
    print(fileStatus.getPath())

【讨论】:

【参考方案6】:

我对其他答案有一些问题(例如“JavaObject”对象不可迭代),但这段代码对我有用

fs = self.spark_contex._jvm.org.apache.hadoop.fs.FileSystem.get(spark_contex._jsc.hadoopConfiguration())
i = fs.listFiles(spark_contex._jvm.org.apache.hadoop.fs.Path(path), False)
while i.hasNext():
  f = i.next()
  print(f.getPath())

【讨论】:

【参考方案7】:

斯卡拉 FileSystem (Apache Hadoop Main 3.2.1 API)

    import org.apache.hadoop.fs.FileSystem, Path
    import scala.collection.mutable.ListBuffer

    
    val fileSystem : FileSystem = 
        val conf = new Configuration()
        conf.set( "fs.defaultFS", "hdfs://to_file_path" )
        FileSystem.get( conf )
    
      
    val files = fileSystem.listFiles( new Path( path ), false )
    val filenames = ListBuffer[ String ]( )
    while ( files.hasNext ) filenames += files.next().getPath().toString()
    filenames.foreach(println(_))

【讨论】:

【参考方案8】:

您也可以尝试使用 globStatus 状态

val listStatus = org.apache.hadoop.fs.FileSystem.get(new URI(url), sc.hadoopConfiguration).globStatus(new org.apache.hadoop.fs.Path(url))

      for (urlStatus <- listStatus) 
        println("urlStatus get Path:"+urlStatus.getPath())

【讨论】:

【参考方案9】:

您可以使用以下代码递归遍历父 HDFS 目录,仅存储子目录至第三级。这很有用,如果您需要列出由于数据分区而创建的所有目录(在下面的代码中,三列用于分区):

val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)

def rememberDirectories(fs: FileSystem, path: List[Path]): List[Path] = 
  val buff = new ListBuffer[LocatedFileStatus]()

  path.foreach(p => 
    val iter = fs.listLocatedStatus(p)
    while (iter.hasNext()) buff += iter.next()
  )

  buff.toList.filter(p => p.isDirectory).map(_.getPath)


@tailrec
def getRelevantDirs(fs: FileSystem, p: List[Path], counter: Int = 1): List[Path] = 
  val levelList = rememberDirectories(fs, p)
  if(counter == 3) levelList
  else getRelevantDirs(fs, levelList, counter + 1)

【讨论】:

以上是关于Spark迭代HDFS目录的主要内容,如果未能解决你的问题,请参考以下文章

使用 Spark 列出 Hadoop HDFS 目录中的所有文件?

如何使用 spark databricks xml 解析器从 Hdfs 目录加载所有 xml 文件

spark work目录处理 And HDFS空间都去哪了?

Hadoop技术之HDFS分布式文件系统基础

【大数据】Spark 递归读取 HDFS

Spark Streaming 002 统计单词的例子