Spark迭代HDFS目录
Posted
技术标签:
【中文标题】Spark迭代HDFS目录【英文标题】:Spark iterate HDFS directory 【发布时间】:2015-01-17 09:06:54 【问题描述】:我在 HDFS 上有一个目录目录,我想遍历这些目录。使用 SparkContext 对象是否有任何简单的方法可以通过 Spark 做到这一点?
【问题讨论】:
您的意思是“迭代”,例如获取其中的子目录和文件列表?还是获取所有子目录中的所有文件? 按照列出所有子目录的方式进行迭代。每个子目录都包含一堆我想以不同方式处理的文本文件。 【参考方案1】:您可以使用org.apache.hadoop.fs.FileSystem
。具体来说,FileSystem.listFiles([path], true)
还有 Spark...
FileSystem.get(sc.hadoopConfiguration).listFiles(..., true)
编辑
值得注意的是,良好的做法是获取与Path
的方案相关联的FileSystem
。
path.getFileSystem(sc.hadoopConfiguration).listFiles(path, true)
【讨论】:
真的很好! I had this question,授予,我想这在最初的 spark-submit 调用中不起作用 如何使用创建的 RemoteIterator 创建文件列表?【参考方案2】:如果有人感兴趣,这里是 PySpark 版本:
hadoop = sc._jvm.org.apache.hadoop
fs = hadoop.fs.FileSystem
conf = hadoop.conf.Configuration()
path = hadoop.fs.Path('/hivewarehouse/disc_mrt.db/unified_fact/')
for f in fs.get(conf).listStatus(path):
print(f.getPath(), f.getLen())
在这种特殊情况下,我得到了构成 disc_mrt.unified_fact Hive 表的所有文件的列表。
FileStatus 对象的其他方法,如获取文件大小的 getLen() 描述如下:
Class FileStatus
【讨论】:
赞成 pyspark 版本!工作完美!谢谢! 谢谢你!想知道如何在python中制作它【参考方案3】:import org.apache.hadoop.fs.FileSystem,Path
FileSystem.get( sc.hadoopConfiguration ).listStatus( new Path("hdfs:///tmp")).foreach( x => println(x.getPath ))
这对我有用。
Spark 版本 1.5.0-cdh5.5.2
【讨论】:
这对我来说很好,对于单个文件夹。有没有办法让它在父文件夹级别运行,并获取所有子文件夹中的所有文件?这对我来说非常有帮助/有用。【参考方案4】:这对我有用
FileSystem.get(new URI("hdfs://HAservice:9000"), sc.hadoopConfiguration).listStatus( new Path("/tmp/")).foreach( x => println(x.getPath ))
【讨论】:
【参考方案5】:@Tagar 没有说如何连接远程 hdfs,但this answer 说了:
URI = sc._gateway.jvm.java.net.URI
Path = sc._gateway.jvm.org.apache.hadoop.fs.Path
FileSystem = sc._gateway.jvm.org.apache.hadoop.fs.FileSystem
Configuration = sc._gateway.jvm.org.apache.hadoop.conf.Configuration
fs = FileSystem.get(URI("hdfs://somehost:8020"), Configuration())
status = fs.listStatus(Path('/some_dir/yet_another_one_dir/'))
for fileStatus in status:
print(fileStatus.getPath())
【讨论】:
【参考方案6】:我对其他答案有一些问题(例如“JavaObject”对象不可迭代),但这段代码对我有用
fs = self.spark_contex._jvm.org.apache.hadoop.fs.FileSystem.get(spark_contex._jsc.hadoopConfiguration())
i = fs.listFiles(spark_contex._jvm.org.apache.hadoop.fs.Path(path), False)
while i.hasNext():
f = i.next()
print(f.getPath())
【讨论】:
【参考方案7】:斯卡拉 FileSystem (Apache Hadoop Main 3.2.1 API)
import org.apache.hadoop.fs.FileSystem, Path
import scala.collection.mutable.ListBuffer
val fileSystem : FileSystem =
val conf = new Configuration()
conf.set( "fs.defaultFS", "hdfs://to_file_path" )
FileSystem.get( conf )
val files = fileSystem.listFiles( new Path( path ), false )
val filenames = ListBuffer[ String ]( )
while ( files.hasNext ) filenames += files.next().getPath().toString()
filenames.foreach(println(_))
【讨论】:
【参考方案8】:您也可以尝试使用 globStatus 状态
val listStatus = org.apache.hadoop.fs.FileSystem.get(new URI(url), sc.hadoopConfiguration).globStatus(new org.apache.hadoop.fs.Path(url))
for (urlStatus <- listStatus)
println("urlStatus get Path:"+urlStatus.getPath())
【讨论】:
【参考方案9】:您可以使用以下代码递归遍历父 HDFS 目录,仅存储子目录至第三级。这很有用,如果您需要列出由于数据分区而创建的所有目录(在下面的代码中,三列用于分区):
val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
def rememberDirectories(fs: FileSystem, path: List[Path]): List[Path] =
val buff = new ListBuffer[LocatedFileStatus]()
path.foreach(p =>
val iter = fs.listLocatedStatus(p)
while (iter.hasNext()) buff += iter.next()
)
buff.toList.filter(p => p.isDirectory).map(_.getPath)
@tailrec
def getRelevantDirs(fs: FileSystem, p: List[Path], counter: Int = 1): List[Path] =
val levelList = rememberDirectories(fs, p)
if(counter == 3) levelList
else getRelevantDirs(fs, levelList, counter + 1)
【讨论】:
以上是关于Spark迭代HDFS目录的主要内容,如果未能解决你的问题,请参考以下文章
使用 Spark 列出 Hadoop HDFS 目录中的所有文件?
如何使用 spark databricks xml 解析器从 Hdfs 目录加载所有 xml 文件