Spark:如何覆盖 S3 文件夹上的文件而不是完整的文件夹
Posted
技术标签:
【中文标题】Spark:如何覆盖 S3 文件夹上的文件而不是完整的文件夹【英文标题】:Spark: How to overwrite a file on S3 folder and not complete folder 【发布时间】:2019-07-09 06:38:31 【问题描述】:使用 Spark,我正在尝试将一些数据(以 csv、parquet 格式)推送到 S3 存储桶。
df.write.mode("OVERWRITE").format("com.databricks.spark.csv").options(nullValue=options['nullValue'], header=options['header'], delimiter=options['delimiter'], quote=options['quote'], escape=options['escape']).save(destination_path)
在上面的代码片段中,destination_path 变量保存需要导出数据的 S3 存储桶位置。
例如。 destination_path = "s3://some-test-bucket/manish/"
如果我有多个文件和子文件夹,则在 some-test-bucket
的文件夹 manish
中。上面的命令将删除所有这些,spark 将写入新的输出文件。但我想用这个新文件只覆盖一个文件。
即使我能够只覆盖这个文件夹的内容,但子文件夹保持不变,即使这样在一定程度上解决了问题。
如何做到这一点?
我尝试使用模式作为追加而不是覆盖。
在这种情况下,子文件夹名称保持不变,但manish
文件夹及其子文件夹的所有内容再次被覆盖。
【问题讨论】:
【参考方案1】:简答:将 Spark 配置参数 spark.sql.sources.partitionOverwriteMode
设置为 dynamic
而不是静态的。这只会覆盖必要的分区,而不是全部。
PySpark 示例:
conf=SparkConf().setAppName("test).set("spark.sql.sources.partitionOverwriteMode","dynamic").setMaster("yarn")
sc = SparkContext(conf=conf)
sqlContext = sql.SQLContext(sc)
【讨论】:
【参考方案2】:可以先删除文件,然后使用追加模式插入数据,而不是覆盖保留子文件夹。以下是 Pyspark 的示例。
import subprocess
subprocess.call(["hadoop", "fs", "-rm", "*.csv.deflate".format(destination_path)])
df.write.mode("append").format("com.databricks.spark.csv").options(nullValue=options['nullValue'], header=options['header'], delimiter=options['delimiter'], quote=options['quote'], escape=options['escape']).save(destination_path)
【讨论】:
以上是关于Spark:如何覆盖 S3 文件夹上的文件而不是完整的文件夹的主要内容,如果未能解决你的问题,请参考以下文章
将 json 对象文件保存为 json 数组而不是 s3 上的 json 对象
s3上的Spark Dataset Parquet分区创建临时文件夹