apache spark - 检查文件是不是存在

Posted

技术标签:

【中文标题】apache spark - 检查文件是不是存在【英文标题】:apache spark - check if file existsapache spark - 检查文件是否存在 【发布时间】:2015-08-05 00:13:41 【问题描述】:

我是 spark 新手,我有一个问题。我有一个两步过程,其中第一步将 SUCCESS.txt 文件写入 HDFS 上的某个位置。我的第二步是 spark 作业,必须在开始处理数据之前验证 SUCCESS.txt 文件是否存在。

我检查了 spark API 并没有找到任何检查文件是否存在的方法。任何想法如何处理这个?

我发现的唯一方法是 sc.textFile(hdfs:///SUCCESS.txt).count() 当文件不存在时会抛出异常。我必须捕获该异常并相应地编写我的程序。我不太喜欢这种方法。希望能找到更好的选择。

【问题讨论】:

【参考方案1】:

对于 HDFS 中的文件,您可以使用hadoop 的方式来执行此操作:

val conf = sc.hadoopConfiguration
val fs = org.apache.hadoop.fs.FileSystem.get(conf)
val exists = fs.exists(new org.apache.hadoop.fs.Path("/path/on/hdfs/to/SUCCESS.txt"))

【讨论】:

【参考方案2】:

对于 Pyspark,您可以在不调用子进程的情况下使用以下方法实现此目的:

fs = sc._jvm.org.apache.hadoop.fs.FileSystem.get(sc._jsc.hadoopConfiguration())
fs.exists(sc._jvm.org.apache.hadoop.fs.Path("path/to/SUCCESS.txt"))

【讨论】:

优雅的解决方案! 终于找到了!【参考方案3】:

我会说,最好的方法是通过在传统的 hadoop 文件检查中内部检查文件存在的函数来调用它。

object OutputDirCheck 
  def dirExists(hdfsDirectory: String): Boolean = 
    val hadoopConf = new org.apache.hadoop.conf.Configuration()
    val fs = org.apache.hadoop.fs.FileSystem.get(hadoopConf)
    fs.exists(new org.apache.hadoop.fs.Path(hdfsDirectory))
  

【讨论】:

【参考方案4】:

使用 dbutils

def path_exists(path):
  try:
    if len(dbutils.fs.ls(path)) > 0:
      return True
  except:
    return False

【讨论】:

较短的方式:def path_exists(path): return len(dbutils.fs.ls(path)) > 0【参考方案5】:

适用于 Java 编码人员;

 SparkConf sparkConf = new SparkConf().setAppName("myClassname");
        SparkContext sparky = new SparkContext(sparkConf);       
        JavaSparkContext context = new JavaSparkContext(sparky);

     FileSystem hdfs = org.apache.hadoop.fs.FileSystem.get(context.hadoopConfiguration());
            Path path = new Path(sparkConf.get(path_to_File));

            if (!hdfs.exists(path)) 
                 //Path does not exist.
             
         else
               //Path exist.
           

【讨论】:

对我来说sparkConf.get(path_to_File) 抱怨文件不存在,这是我试图避免的事情。 new Path(path_to_File) 直接但是有效。【参考方案6】:

对于 pyspark python 用户:

我没有用 python 或 pyspark 找到任何东西,所以我们需要从 python 代码执行 hdfs 命令。这对我有用。

获取文件夹是否存在的hdfs命令:如果为真则返回0

hdfs dfs -test -d /folder-path

获取文件是否存在的hdfs命令:如果为真则返回0

hdfs dfs -test -d /folder-path 

为了将其放入 python 代码中,我遵循以下代码行:

import subprocess

def run_cmd(args_list):
    proc = subprocess.Popen(args_list, stdout=subprocess.PIPE,
            stderr=subprocess.PIPE)
    proc.communicate()
    return proc.returncode

cmd = ['hdfs', 'dfs', '-test', '-d', "/folder-path"]
            code = run_cmd(cmd)
if code == 0:
    print('folder exist')
    print(code) 

如果文件夹存在则输出:

文件夹存在 0

【讨论】:

【参考方案7】:

对于 PySpark:

from py4j.protocol import Py4JJavaError
def path_exist(path):
    try:
        rdd = sc.textFile(path)
        rdd.take(1)
        return True
    except Py4JJavaError as e:
        return False

【讨论】:

【参考方案8】:

Spark 2.0 或更高版本可以使用 hadoop.fr.FileSystem 的方法exist:

import org.apache.hadoop.fs.FileSystem, Path
import org.apache.spark.sql.SparkSession

object Test extends App 
  val spark = SparkSession.builder
    .master("local[*]")
    .appName("BigDataETL - Check if file exists")
    .getOrCreate()

  val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
  // This methods returns Boolean (true - if file exists, false - if file doesn't exist
  val fileExists = fs.exists(new Path("<parh_to_file>"))
  if (fileExists) println("File exists!")
  else println("File doesn't exist!")

对于 Spark 1.6 到 2.0

import org.apache.hadoop.fs.FileSystem, Path
import org.apache.spark.SparkConf, SparkContext

object Test extends App 
  val sparkConf = new SparkConf().setAppName(s"BigDataETL - Check if file exists")
  val sc = new SparkContext(sparkConf)
  val fs = FileSystem.get(sc.hadoopConfiguration)
  val fileExists = fs.exists(new Path("<parh_to_file>"))
  if (fileExists) println("File exists!")
  else println("File doesn't exist!")

【讨论】:

以上是关于apache spark - 检查文件是不是存在的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 Apache POI 和 Java 代码检查 Excel 文件中是不是存在信息

spark - scala - 如何检查配置单元中是不是存在表

如何使用 spark/scala 检查是不是存在大查询表

检查 DF 中是不是存在列 - Java Spark

使用 Java 检查 Spark Dataframe 中是不是存在列

Perl LWP::Simple::getstore 如何检查文件是不是存在于目标目录中