s3上的Spark Dataset Parquet分区创建临时文件夹

Posted

技术标签:

【中文标题】s3上的Spark Dataset Parquet分区创建临时文件夹【英文标题】:Spark Dataset Parquet partition on s3 creating temporary folder 【发布时间】:2018-02-01 23:51:43 【问题描述】:

Spark(version=2.2.0) 没有DirectParquetOutputCommitter。作为替代方案,我可以使用

dataset
    .option("mapreduce.fileoutputcommitter.algorithm.version", "2")//magic here
    .parquet("s3a://...")

避免在S3 上创建_temporary 文件夹。

在我为我的数据集设置 partitionBy 之前一切正常

dataset
    .partitionBy("a", "b")
    .option("mapreduce.fileoutputcommitter.algorithm.version", "2")//magic stop working creating _temporary on S3
    .parquet("s3a://...")

也尝试添加但没有用

spark.conf.set("mapreduce.fileoutputcommitter.algorithm.version", "2")

在 Spark 数据集中使用 partitionBy,它将创建 _temporary 并移动文件,这将成为一个非常缓慢的操作。

有任何替代或缺少的配置吗?

【问题讨论】:

【参考方案1】:

Hadoop 3.1 的 s3a 将内置一个零重命名提交器 (va HADOOP-13786)。在那之前,你可以使用它的前身,is from netflix

请注意,“算法 2”不是消除 _temp 目录的神奇步骤,只是在单个任务提交时将任务输出直接重命名为目标。如果目录列表中存在延迟一致性,仍然容易出错,并且仍然是 O(data)。您不能安全地将 v1 或 v2 提交者直接与 S3 一起使用,而不是与 Hadoop 2.x 中的 S3A 连接器一起使用

【讨论】:

是的,S3Committer!【参考方案2】:

备选方案(按照推荐 + 易用性的顺序 - 最佳):

    使用 Netflix 的 S3Committer:https://github.com/rdblue/s3committer/ 写入 HDFS,然后复制到 S3(例如通过 s3distcp) 不要使用 partitionBy,而是遍历所有分区排列并将结果动态写入每个分区目录 编写自定义文件提交器

【讨论】:

嗯,我想避免从我的 Scala 代码中调用 Bash 命令 完全明白,那么使用 s3distcp 的#1 就出来了。 #2 是最好的选择,除非您想通过使用 spark 以 blob/bytes 的形式读取和写入 s3 来获得 hacky。我已经有一段时间没有玩过使用 Algo 2 分区的 s3 提交了,尤其是以一种无菌(非专有云优化)的方式。 您可以在 spark 本身中重新实现基本的 distcp,不需要太多努力:github.com/hortonworks-spark/cloud-integration/blob/master/…

以上是关于s3上的Spark Dataset Parquet分区创建临时文件夹的主要内容,如果未能解决你的问题,请参考以下文章

使用 Spark 通过 s3a 将 parquet 文件写入 s3 非常慢

S3 Select 会加速 Parquet 文件的 Spark 分析吗?

Spark:如何覆盖 S3 文件夹上的文件而不是完整的文件夹

Databricks 上的 Spark - 缓存 Hive 表

使用 saveAsTable 将 parquet 数据写入 S3 未完成

显示在 Spark+Parquet 程序中读取的字节数