s3上的Spark Dataset Parquet分区创建临时文件夹
Posted
技术标签:
【中文标题】s3上的Spark Dataset Parquet分区创建临时文件夹【英文标题】:Spark Dataset Parquet partition on s3 creating temporary folder 【发布时间】:2018-02-01 23:51:43 【问题描述】:Spark(version=2.2.0)
没有DirectParquetOutputCommitter
。作为替代方案,我可以使用
dataset
.option("mapreduce.fileoutputcommitter.algorithm.version", "2")//magic here
.parquet("s3a://...")
避免在S3
上创建_temporary
文件夹。
在我为我的数据集设置 partitionBy
之前一切正常
dataset
.partitionBy("a", "b")
.option("mapreduce.fileoutputcommitter.algorithm.version", "2")//magic stop working creating _temporary on S3
.parquet("s3a://...")
也尝试添加但没有用
spark.conf.set("mapreduce.fileoutputcommitter.algorithm.version", "2")
在 Spark 数据集中使用 partitionBy
,它将创建 _temporary
并移动文件,这将成为一个非常缓慢的操作。
有任何替代或缺少的配置吗?
【问题讨论】:
【参考方案1】:Hadoop 3.1 的 s3a 将内置一个零重命名提交器 (va HADOOP-13786)。在那之前,你可以使用它的前身,is from netflix
请注意,“算法 2”不是消除 _temp 目录的神奇步骤,只是在单个任务提交时将任务输出直接重命名为目标。如果目录列表中存在延迟一致性,仍然容易出错,并且仍然是 O(data)。您不能安全地将 v1 或 v2 提交者直接与 S3 一起使用,而不是与 Hadoop 2.x 中的 S3A 连接器一起使用
【讨论】:
是的,S3Committer!【参考方案2】:备选方案(按照推荐 + 易用性的顺序 - 最佳):
-
使用 Netflix 的 S3Committer:https://github.com/rdblue/s3committer/
写入 HDFS,然后复制到 S3(例如通过 s3distcp)
不要使用 partitionBy,而是遍历所有分区排列并将结果动态写入每个分区目录
编写自定义文件提交器
【讨论】:
嗯,我想避免从我的 Scala 代码中调用 Bash 命令 完全明白,那么使用 s3distcp 的#1 就出来了。 #2 是最好的选择,除非您想通过使用 spark 以 blob/bytes 的形式读取和写入 s3 来获得 hacky。我已经有一段时间没有玩过使用 Algo 2 分区的 s3 提交了,尤其是以一种无菌(非专有云优化)的方式。 您可以在 spark 本身中重新实现基本的 distcp,不需要太多努力:github.com/hortonworks-spark/cloud-integration/blob/master/…以上是关于s3上的Spark Dataset Parquet分区创建临时文件夹的主要内容,如果未能解决你的问题,请参考以下文章
使用 Spark 通过 s3a 将 parquet 文件写入 s3 非常慢
S3 Select 会加速 Parquet 文件的 Spark 分析吗?
Spark:如何覆盖 S3 文件夹上的文件而不是完整的文件夹
Databricks 上的 Spark - 缓存 Hive 表