Pyspark 在读取目录中的所有 parquet 文件时失败,但在单独处理文件时成功

Posted

技术标签:

【中文标题】Pyspark 在读取目录中的所有 parquet 文件时失败,但在单独处理文件时成功【英文标题】:Pyspark fails when reading all parquet files in directory but succeeds when files processed individually 【发布时间】:2020-07-02 19:50:10 【问题描述】:

我遇到了我的 pyspark 作业间歇性失败的问题。 当我使用下面的代码从目录中读取所有 parquet 文件时,它会引发异常 File "C:\Python27\Lib\site-packages\pyspark\python\lib\pyspark.zip\pyspark\serializers .py”,第 717 行,在 read_int 中 引发 EOFError EOF错误

inputPathWithData = []
    if (path.exists(input)):
        inputPathWithData.append(input + "\\*.parquet")

    if (len(inputPathWithData) > 0):
        parquetFile = spark.read.parquet(*inputPathWithData)
        parquetFile.createOrReplaceTempView("imp_parquetFile")
        imp_df = spark.sql("SELECT  * FROM imp_parquetFile as imp")
        imp_df.write.json(output_path +"Single", compression="gzip")

但是,如果我单独阅读文件,一切都会按预期工作

i = 0
for input_file in os.listdir(input) :
    i+=1
    parquetFile = spark.read.parquet(input + input_file)
    parquetFile.createOrReplaceTempView("imp_parquetFile")
    try:
        imp_df = spark.sql("SELECT * FROM imp_parquetFile")
        imp_df.write.json(output_path + '_ '+ str(i), compression="gzip")
    except:
        print("Issue with Filename: 0".format(input_file))

初始化spark的代码是

spark = SparkSession \
    .builder \
    .appName("scratch_test") \
    .config("spark.debug.maxToStringFields", "100") \
    .config("spark.executor.memory", "10G") \
    .getOrCreate()

【问题讨论】:

【参考方案1】:

您无需指定每个镶木地板文件。只需阅读整个文件夹。

df=spark.read.parquet("/folder/")

【讨论】:

谢谢! .在我的真实用例中,我从多个文件夹中读取文件。有没有办法像我目前对多个文件一样从多个文件夹中读取镶木地板文件? 您可以将目录放在列表中并像现在一样阅读它。不用放 *.parquet dir_list = ['/dir1/','/dir2/'] df=spark.read.parquet(*dir_list)

以上是关于Pyspark 在读取目录中的所有 parquet 文件时失败,但在单独处理文件时成功的主要内容,如果未能解决你的问题,请参考以下文章

从目录读取镶木地板文件时,pyspark不保存

Pyspark:无法从 SparkFiles 读取镶木地板文件

无法使用 Pyspark 2.4.4 读取 s3 存储桶中的镶木地板文件

在 HIVE 中使用 CDH 5.4 和 Spark 1.3.0 和 Parquet 表的 PySpark 中的 Parquet 错误

pyspark文件读写示例-(CSV/JSON/Parquet-单个或多个)

如何使用pyspark使用的通配符读取hdfs文件