使用 Pyspark 在 s3 中写入镶木地板文件时出错
Posted
技术标签:
【中文标题】使用 Pyspark 在 s3 中写入镶木地板文件时出错【英文标题】:Error writing parquet file in s3 with Pyspark 【发布时间】:2020-07-10 09:01:38 【问题描述】:我正在尝试读取一些表(parquet 文件)并进行一些连接并将它们写入 S3 中的 parquet 格式,但是我遇到了一个错误或花费了几个小时来编写表。
错误:
An error was encountered:
Invalid status code '400' from https://.... with error payload: "msg":"requirement failed: session isn't active."
除了那张桌子,我还能把其他桌子写成拼花。
这是我的示例代码:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
spark = SparkSession.builder.config("spark.sql.catalogImplementation", "in-memory").getOrCreate()
table1 = spark.read.parquet("s3://.../table1")
table1.createOrReplaceTempView("table1")
table2 = spark.read.parquet("s3://.../table2")
table2.createOrReplaceTempView("table2")
table3 = spark.read.parquet("s3://.../table3")
table3.createOrReplaceTempView("table3")
table4 = spark.read.parquet("s3://.../table4")
table4.createOrReplaceTempView("table4")
Final_table = spark.sql("""
select
a.col1
a.col2
...
d.coln
from
table1 a
left outer join
table2 b
on
cond1
cond2
cond3
left outer join
table3 c
on
...
""")
Final_table.count()
# 3813731240
output_file="s3://.../final_table/"
final_table.write.option("partitionOverwriteMode", "dynamic").mode('overwrite').partitionBy("col1").parquet(output_file)
只是为了添加更多,我已经尝试重新分区但没有奏效。此外,我尝试过使用不同的 EMR 集群,例如 集群 1: 掌握 m5.24xlarge
集群 2: 掌握 m5.24xlarge 1个核心 m5.24xlarge
集群 3: 掌握 m5d.2xlarge 8核 m5d.2xlarge
EMR 发布版本 5.29.0
【问题讨论】:
您如何将工作提交给 EMR? @srikanthholur 我在 EMR 中使用 jupyter notebook,但我也尝试过 spark-submit。 设置这个属性,看看是否能解决问题。("spark.executor.heartbeatInterval","3600s")
@srikanthholur 没有仍然得到同样的错误。
【参考方案1】:
大多数 Spark 作业都可以通过可视化其 DAG 来优化。
在这种情况下,如果您能够运行 sql 并在最短的时间内获得计数,并且您的所有时间都只用于编写,那么这里有一些建议
-
由于您已经知道数据帧的计数,请删除计数操作,因为这对您的工作来说是不必要的开销。
现在您正在根据 col1 对数据进行分区,因此最好尝试重新分区您的数据,以便在撰写本文时执行最少的 shuffle。
你可以这样做
df.repartition('col1', 100).write
如果你知道的话,你也可以根据分区数来设置数字。
【讨论】:
以上是关于使用 Pyspark 在 s3 中写入镶木地板文件时出错的主要内容,如果未能解决你的问题,请参考以下文章
Pyspark 数据框写入镶木地板而不删除 /_temporary 文件夹
使用 pyspark 将镶木地板文件(在 aws s3 中)存储到 spark 数据框中
使用 pyspark 从 s3 位置读取镶木地板文件的文件夹到 pyspark 数据帧