使用 Pyspark 在 s3 中写入镶木地板文件时出错

Posted

技术标签:

【中文标题】使用 Pyspark 在 s3 中写入镶木地板文件时出错【英文标题】:Error writing parquet file in s3 with Pyspark 【发布时间】:2020-07-10 09:01:38 【问题描述】:

我正在尝试读取一些表(parquet 文件)并进行一些连接并将它们写入 S3 中的 parquet 格式,但是我遇到了一个错误或花费了几个小时来编写表。

错误:


    An error was encountered:
    Invalid status code '400' from https://.... with error payload: "msg":"requirement failed: session isn't active."

除了那张桌子,我还能把其他桌子写成拼花。

这是我的示例代码:

from pyspark.sql import SparkSession
from pyspark.sql.functions import col
spark = SparkSession.builder.config("spark.sql.catalogImplementation", "in-memory").getOrCreate()

table1 = spark.read.parquet("s3://.../table1")
table1.createOrReplaceTempView("table1")

table2 = spark.read.parquet("s3://.../table2")
table2.createOrReplaceTempView("table2")

table3 = spark.read.parquet("s3://.../table3")
table3.createOrReplaceTempView("table3")

table4 = spark.read.parquet("s3://.../table4")
table4.createOrReplaceTempView("table4")

Final_table = spark.sql("""
select
      a.col1
      a.col2
...
      d.coln
 from

        table1 a
        left outer join
        table2 b
        on
        cond1
        cond2
        cond3
        left outer join
        table3 c
        on
...
        """)

Final_table.count()
# 3813731240

output_file="s3://.../final_table/"

final_table.write.option("partitionOverwriteMode", "dynamic").mode('overwrite').partitionBy("col1").parquet(output_file)

只是为了添加更多,我已经尝试重新分区但没有奏效。此外,我尝试过使用不同的 EMR 集群,例如 集群 1: 掌握 m5.24xlarge

集群 2: 掌握 m5.24xlarge 1个核心 m5.24xlarge

集群 3: 掌握 m5d.2xlarge 8核 m5d.2xlarge

EMR 发布版本 5.29.0

【问题讨论】:

您如何将工作提交给 EMR? @srikanthholur 我在 EMR 中使用 jupyter notebook,但我也尝试过 spark-submit。 设置这个属性,看看是否能解决问题。 ("spark.executor.heartbeatInterval","3600s") @srikanthholur 没有仍然得到同样的错误。 【参考方案1】:

大多数 Spark 作业都可以通过可视化其 DAG 来优化。

在这种情况下,如果您能够运行 sql 并在最短的时间内获得计数,并且您的所有时间都只用于编写,那么这里有一些建议

    由于您已经知道数据帧的计数,请删除计数操作,因为这对您的工作来说是不必要的开销。 现在您正在根据 col1 对数据进行分区,因此最好尝试重新分区您的数据,以便在撰写本文时执行最少的 shuffle。

你可以这样做

df.repartition('col1', 100).write

如果你知道的话,你也可以根据分区数来设置数字。

【讨论】:

以上是关于使用 Pyspark 在 s3 中写入镶木地板文件时出错的主要内容,如果未能解决你的问题,请参考以下文章

Pyspark 数据框写入镶木地板而不删除 /_temporary 文件夹

使用 pyspark 将镶木地板文件(在 aws s3 中)存储到 spark 数据框中

使用 pyspark 从 s3 位置读取镶木地板文件的文件夹到 pyspark 数据帧

无法使用 Pyspark 2.4.4 读取 s3 存储桶中的镶木地板文件

在 PySpark 中写入镶木地板的问题

将小 PySpark DataFrame 写入镶木地板时出现内存错误