Spark:仅当路径存在时才读取文件

Posted

技术标签:

【中文标题】Spark:仅当路径存在时才读取文件【英文标题】:Spark : Read file only if the path exists 【发布时间】:2017-12-24 22:01:55 【问题描述】:

我正在尝试读取 scala 中路径的Sequence 中存在的文件。以下是示例(伪)代码:

val paths = Seq[String] //Seq of paths
val dataframe = spark.read.parquet(paths: _*)

现在,在上述序列中,有些路径存在而有些则不存在。在读取parquet 文件时有什么方法可以忽略丢失的路径(以避免org.apache.spark.sql.AnalysisException: Path does not exist)?

我已经尝试了以下方法,它似乎可以正常工作,但是,我最终两次读取相同的路径,这是我想避免做的事情:

val filteredPaths = paths.filter(p => Try(spark.read.parquet(p)).isSuccess)

我检查了DataFrameReaderoptions 方法,但似乎没有任何类似于ignore_if_missing 的选项。

此外,这些路径可以是hdfss3(此Seq 作为方法参数传递),在阅读时,我不知道路径是s3 还是hdfs 所以可以'不要使用s3hdfs 特定的API 来检查是否存在。

【问题讨论】:

【参考方案1】:

您可以像@Psidom 的回答一样过滤掉不相关的文件。在 spark 中,最好的方法是使用内部 spark hadoop 配置。鉴于 spark 会话变量称为“spark”,您可以这样做:

import org.apache.hadoop.fs.FileSystem
import org.apache.hadoop.fs.Path

val hadoopfs: FileSystem = FileSystem.get(spark.sparkContext.hadoopConfiguration)

def testDirExist(path: String): Boolean = 
  val p = new Path(path)
  hadoopfs.exists(p) && hadoopfs.getFileStatus(p).isDirectory

val filteredPaths = paths.filter(p => testDirExists(p))
val dataframe = spark.read.parquet(filteredPaths: _*)

【讨论】:

根据您的系统设置,您可能需要在 get 中指定您的文件系统位置:FileSystem.get(new URI("s3://bucket"), spark.sparkContext.hadoopConfiguration)。否则,它可能会在检查 S3 文件系统的路径时创建 HDFS 文件系统和 barf。【参考方案2】:

先过滤paths怎么样`:

paths.filter(f => new java.io.File(f).exists)

例如:

Seq("/tmp", "xx").filter(f => new java.io.File(f).exists)
// res18: List[String] = List(/tmp)

【讨论】:

Paths 可以是本地的hdfs 路径或s3 路径。不确定File.exists 是否适用于s3 如果路径是 HDFS / S3 路径(通常与 Spark 一起使用),则需要稍微不同的 API 来检查路径是否存在。 [@DarshanMehta 你比我快 3 秒 :)] @TzachZohar 哈哈是的。我现在更新了问题。 对于 S3,您可能需要查看 doesObjectExist,对于 hdfs,您可以查看 this answer。【参考方案3】:

从 Spark 2.3.0 开始有一个配置 spark.sql.files.ignoreMissingFiles。只需将其设置为true

https://spark.apache.org/docs/latest/configuration.html

【讨论】:

以上是关于Spark:仅当路径存在时才读取文件的主要内容,如果未能解决你的问题,请参考以下文章

当路径正确时,为啥 NSFIleManager -fileExistsAtPath 找不到现有文件?

如何仅读取目录中的前 5 个文件夹:Spark

spark读取parquet文件报错:Unable to infer schema when loading Parquet file

spark读取文件时对字符编码的支持

Spark 读取带有部分模式的 json

python文件处理集锦