apache spark - 检查文件是不是存在
Posted
技术标签:
【中文标题】apache spark - 检查文件是不是存在【英文标题】:apache spark - check if file existsapache spark - 检查文件是否存在 【发布时间】:2015-08-05 00:13:41 【问题描述】:我是 spark 新手,我有一个问题。我有一个两步过程,其中第一步将 SUCCESS.txt 文件写入 HDFS 上的某个位置。我的第二步是 spark 作业,必须在开始处理数据之前验证 SUCCESS.txt 文件是否存在。
我检查了 spark API 并没有找到任何检查文件是否存在的方法。任何想法如何处理这个?
我发现的唯一方法是 sc.textFile(hdfs:///SUCCESS.txt).count() 当文件不存在时会抛出异常。我必须捕获该异常并相应地编写我的程序。我不太喜欢这种方法。希望能找到更好的选择。
【问题讨论】:
【参考方案1】:对于 HDFS 中的文件,您可以使用hadoop 的方式来执行此操作:
val conf = sc.hadoopConfiguration
val fs = org.apache.hadoop.fs.FileSystem.get(conf)
val exists = fs.exists(new org.apache.hadoop.fs.Path("/path/on/hdfs/to/SUCCESS.txt"))
【讨论】:
【参考方案2】:对于 Pyspark,您可以在不调用子进程的情况下使用以下方法实现此目的:
fs = sc._jvm.org.apache.hadoop.fs.FileSystem.get(sc._jsc.hadoopConfiguration())
fs.exists(sc._jvm.org.apache.hadoop.fs.Path("path/to/SUCCESS.txt"))
【讨论】:
优雅的解决方案! 终于找到了!【参考方案3】:我会说,最好的方法是通过在传统的 hadoop 文件检查中内部检查文件存在的函数来调用它。
object OutputDirCheck
def dirExists(hdfsDirectory: String): Boolean =
val hadoopConf = new org.apache.hadoop.conf.Configuration()
val fs = org.apache.hadoop.fs.FileSystem.get(hadoopConf)
fs.exists(new org.apache.hadoop.fs.Path(hdfsDirectory))
【讨论】:
【参考方案4】:使用 dbutils:
def path_exists(path):
try:
if len(dbutils.fs.ls(path)) > 0:
return True
except:
return False
【讨论】:
较短的方式:def path_exists(path): return len(dbutils.fs.ls(path)) > 0
【参考方案5】:
适用于 Java 编码人员;
SparkConf sparkConf = new SparkConf().setAppName("myClassname");
SparkContext sparky = new SparkContext(sparkConf);
JavaSparkContext context = new JavaSparkContext(sparky);
FileSystem hdfs = org.apache.hadoop.fs.FileSystem.get(context.hadoopConfiguration());
Path path = new Path(sparkConf.get(path_to_File));
if (!hdfs.exists(path))
//Path does not exist.
else
//Path exist.
【讨论】:
对我来说sparkConf.get(path_to_File)
抱怨文件不存在,这是我试图避免的事情。 new Path(path_to_File)
直接但是有效。【参考方案6】:
对于 pyspark python 用户:
我没有用 python 或 pyspark 找到任何东西,所以我们需要从 python 代码执行 hdfs 命令。这对我有用。
获取文件夹是否存在的hdfs命令:如果为真则返回0
hdfs dfs -test -d /folder-path
获取文件是否存在的hdfs命令:如果为真则返回0
hdfs dfs -test -d /folder-path
为了将其放入 python 代码中,我遵循以下代码行:
import subprocess
def run_cmd(args_list):
proc = subprocess.Popen(args_list, stdout=subprocess.PIPE,
stderr=subprocess.PIPE)
proc.communicate()
return proc.returncode
cmd = ['hdfs', 'dfs', '-test', '-d', "/folder-path"]
code = run_cmd(cmd)
if code == 0:
print('folder exist')
print(code)
如果文件夹存在则输出:
文件夹存在 0
【讨论】:
【参考方案7】:对于 PySpark:
from py4j.protocol import Py4JJavaError
def path_exist(path):
try:
rdd = sc.textFile(path)
rdd.take(1)
return True
except Py4JJavaError as e:
return False
【讨论】:
【参考方案8】:Spark 2.0 或更高版本可以使用 hadoop.fr.FileSystem 的方法exist:
import org.apache.hadoop.fs.FileSystem, Path
import org.apache.spark.sql.SparkSession
object Test extends App
val spark = SparkSession.builder
.master("local[*]")
.appName("BigDataETL - Check if file exists")
.getOrCreate()
val fs = FileSystem.get(spark.sparkContext.hadoopConfiguration)
// This methods returns Boolean (true - if file exists, false - if file doesn't exist
val fileExists = fs.exists(new Path("<parh_to_file>"))
if (fileExists) println("File exists!")
else println("File doesn't exist!")
对于 Spark 1.6 到 2.0
import org.apache.hadoop.fs.FileSystem, Path
import org.apache.spark.SparkConf, SparkContext
object Test extends App
val sparkConf = new SparkConf().setAppName(s"BigDataETL - Check if file exists")
val sc = new SparkContext(sparkConf)
val fs = FileSystem.get(sc.hadoopConfiguration)
val fileExists = fs.exists(new Path("<parh_to_file>"))
if (fileExists) println("File exists!")
else println("File doesn't exist!")
【讨论】:
以上是关于apache spark - 检查文件是不是存在的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 Apache POI 和 Java 代码检查 Excel 文件中是不是存在信息
spark - scala - 如何检查配置单元中是不是存在表