Spark:如何覆盖 S3 文件夹上的文件而不是完整的文件夹

Posted

技术标签:

【中文标题】Spark:如何覆盖 S3 文件夹上的文件而不是完整的文件夹【英文标题】:Spark: How to overwrite a file on S3 folder and not complete folder 【发布时间】:2019-07-09 06:38:31 【问题描述】:

使用 Spark,我正在尝试将一些数据(以 csv、parquet 格式)推送到 S3 存储桶。

df.write.mode("OVERWRITE").format("com.databricks.spark.csv").options(nullValue=options['nullValue'], header=options['header'], delimiter=options['delimiter'], quote=options['quote'], escape=options['escape']).save(destination_path)

在上面的代码片段中,destination_path 变量保存需要导出数据的 S3 存储桶位置。

例如。 destination_path = "s3://some-test-bucket/manish/"

如果我有多个文件和子文件夹,则在 some-test-bucket 的文件夹 manish 中。上面的命令将删除所有这些,spark 将写入新的输出文件。但我想用这个新文件只覆盖一个文件。

即使我能够只覆盖这个文件夹的内容,但子文件夹保持不变,即使这样在一定程度上解决了问题。

如何做到这一点?

我尝试使用模式作为追加而不是覆盖。

在这种情况下,子文件夹名称保持不变,但manish 文件夹及其子文件夹的所有内容再次被覆盖。

【问题讨论】:

【参考方案1】:

简答:将 Spark 配置参数 spark.sql.sources.partitionOverwriteMode 设置为 dynamic 而不是静态的。这只会覆盖必要的分区,而不是全部。 PySpark 示例:

conf=SparkConf().setAppName("test).set("spark.sql.sources.partitionOverwriteMode","dynamic").setMaster("yarn")
sc = SparkContext(conf=conf)
sqlContext = sql.SQLContext(sc)

【讨论】:

【参考方案2】:

可以先删除文件,然后使用追加模式插入数据,而不是覆盖保留子文件夹。以下是 Pyspark 的示例。

import subprocess
subprocess.call(["hadoop", "fs", "-rm", "*.csv.deflate".format(destination_path)])

df.write.mode("append").format("com.databricks.spark.csv").options(nullValue=options['nullValue'], header=options['header'], delimiter=options['delimiter'], quote=options['quote'], escape=options['escape']).save(destination_path)

【讨论】:

以上是关于Spark:如何覆盖 S3 文件夹上的文件而不是完整的文件夹的主要内容,如果未能解决你的问题,请参考以下文章

Amazon S3 策略只允许上传而不是覆盖 [重复]

将 json 对象文件保存为 json 数组而不是 s3 上的 json 对象

s3上的Spark Dataset Parquet分区创建临时文件夹

Spark S3A 写入省略了上传部分而没有失败

Spark - 从 S3 读取分区数据 - 分区是如何发生的?

再谈Spark下写S3文件的File Output Committer问题