附加时的镶木地板文件保护

Posted

技术标签:

【中文标题】附加时的镶木地板文件保护【英文标题】:parquet files protection when appending 【发布时间】:2020-01-10 14:24:07 【问题描述】:

当我尝试对 AWS 上的大量文件执行 ETL 时遇到问题。 目标是将 JSON 文件转换为 parquet 文件。由于文件的大小,我必须逐批进行。假设我需要分 15 个批次进行,即 15 次单独运行才能转换所有批次。

我正在使用 write.mode("append").format("parquet") 在每个胶水 pyspark 作业中写入镶木地板文件来做到这一点。

我的问题是,如果一项作业因某种原因失败,那么我不知道该怎么办 - 一些分区已更新,而有些则没有,批处理中的一些文件已处理,而有些尚未处理。例如,如果我的第 9 份工作失败了,我就会陷入困境。我不想删除所有镶木地板文件重新开始,但也不想只是重新运行第 9 个作业并导致重复。

有没有办法保护 parquet 文件,仅在整个作业成功时才将新文件附加到其中?

谢谢!!

【问题讨论】:

您的存储是在 S3 中还是在虚拟机的驱动器或其他东西上? @napoleon_borntoparty S3. 【参考方案1】:

根据您的评论和我在此问题上的类似经历,我相信这是因为 S3 最终一致性。在https://docs.aws.amazon.com/AmazonS3/latest/dev/Introduction.html 处查看Amazon S3 数据一致性模型

我们发现使用分区暂存 s3a 提交者和冲突解决模式 replace 使我们的工作不会失败。 在您的 spark 作业中尝试以下参数:

spark.hadoop.fs.s3a.committer.staging.conflict-mode replace
spark.hadoop.fs.s3a.committer.name partitioned

还可以在此处阅读有关committers 的信息: https://hadoop.apache.org/docs/r3.1.1/hadoop-aws/tools/hadoop-aws/committers.html

希望这会有所帮助!

附:如果一切都失败并且我们的文件不是太大,您可以做一个 hacky 解决方案,将您的 parquet 文件保存在本地,并在您的 spark 任务完成时上传,但我个人不建议这样做。

【讨论】:

以上是关于附加时的镶木地板文件保护的主要内容,如果未能解决你的问题,请参考以下文章

读取列中具有混合数据类型的镶木地板文件

如何在读取前根据定义的模式读取 pyspark 中的镶木地板文件?

更新取决于自己的数据的镶木地板文件的最佳方法

我对镶木地板文件和 python 完全陌生,谁能告诉我如何在 pyspark 中读取带有标题的镶木地板文件

AWS Spectrum 为 AWS Glue 生成的镶木地板文件提供空白结果

Pyspark:从不同的目录加载类似的镶木地板,并将文件夹名称作为一列组合成一个 DataFrame [重复]