使用 Spark 通过 s3a 将 parquet 文件写入 s3 非常慢

Posted

技术标签:

【中文标题】使用 Spark 通过 s3a 将 parquet 文件写入 s3 非常慢【英文标题】:Using Spark to write a parquet file to s3 over s3a is very slow 【发布时间】:2016-08-23 23:59:50 【问题描述】:

我正在尝试使用Spark 1.6.1parquet 文件写入Amazon S3。我生成的小parquet~2GB,一旦写入,它就没有那么多数据。我试图证明Spark 是我可以使用的平台。

基本上我要做的是设置一个star schemadataframes,然后我要把这些表写成镶木地板。数据来自供应商提供的 csv 文件,我使用 Spark 作为ETL 平台。我目前在ec2(r3.2xlarge) 中有一个 3 节点集群,所以在执行程序上的内存为120GB,总共有 16 个内核。

输入文件总共大约 22GB,我现在正在提取大约 2GB 的数据。最终,当我开始加载完整数据集时,这将是许多 TB。

这是我的 spark/scala pseudocode:

  def loadStage(): Unit = 
    sc.hadoopConfiguration.set("fs.s3a.buffer.dir", "/tmp/tempData")
    sc.hadoopConfiguration.set("spark.sql.parquet.output.committer.class","org.apache.spark.sql.parquet.DirectParquetOutputCommitter")
    sc.hadoopConfiguration.set("spark.sql.hive.convertMetastoreParquet","false")
    var sqlCtx = new SQLContext(sc)


    val DataFile = sc.textFile("s3a://my-bucket/archive/*/file*.gz")

    //Setup header table/df
    val header_rec = DataFile.map(_.split("\\|")).filter(x=> x(0) == "1")
    val headerSchemaDef = "market_no,rel_date,field1, field2, field3....."
    val headerSchema = StructType(headerSchemaDef.split(",").map(fieldName => StructField(fieldName, StringType,false)))
    val headerRecords = header_rec.map(p => Row(p(3), p(8), p(1), p(2), p(4), p(5), p(6) ))
    val header = sqlCtx.createDataFrame(headerRecords, headerSchema)
    header.registerTempTable("header")
    sqlCtx.cacheTable("header")


    //Setup fact table/df
    val fact_recs = DataFile.map(_.split("\\|")).filter(x=> x(0) == "2")
    val factSchemaDef = "market_no,rel_date,field1, field2, field3....."
    val factSchema = StructType(factSchemaDef.split(",").map(fieldName => StructField(fieldName, StringType,false)))
    val records = fact_recs.map(p => Row(p(11), p(12), p(1), p(2), p(3), p(4), p(5), p(6), p(7), p(8), p(9), p(10)))
    val df = sqlCtx.createDataFrame(records, factSchema)
    df.registerTempTable("fact")

    val results = sqlCtx.sql("select fact.* from header inner join fact on fact.market_no = header.market_no and fact.rel_date = header.rel_date")


    println(results.count())



    results.coalesce(1).write.mode(SaveMode.Overwrite).parquet("s3a://my-bucket/a/joined_data.parquet")


  

465884512 行的计数大约需要 2 分钟。写入 parquet 需要 38 分钟

我知道coalesce 对执行写入操作的驱动程序进行了洗牌......但它所花费的时间让我觉得我做错了什么。如果没有coalesce,这仍然需要 15 分钟,IMO 仍然太长,并且给了我大量的小 parquet 文件。我希望每天拥有一个大文件的数据。我也有代码来执行按字段值分区,而且它同样慢。我也尝试将其输出到csv,这需要大约 1 小时。

另外,我在提交作业时并没有真正设置运行时道具。我的一项工作的控制台统计数据是:

活着的工人:2 正在使用的核心:总共 16 个,已使用 16 个 正在使用的内存:总计 117.5 GB,已使用 107.5 GB 申请:1 个正在运行,5 个已完成 驱动程序:0 个正在运行,0 个已完成 状态:活着

【问题讨论】:

a coalesce 不会对驱动程序进行改组,而是在执行程序之间改组,但这与您看到的问题无关。你在使用电子病历吗?如果是这样,请使用 s3:// 而不是 s3a://。无论哪种方式,在 Spark 1.6 上,您都应该像 @David 所说的那样使用 Direct OutputCommitter。另一个可能的改进是将 parquet.enable.summary-metadata 设置为 false 在 S3 前使用 Alluxio 是否会加快速度? 【参考方案1】:

Spark 默认值会在 I/O 操作期间导致大量(可能)不必要的开销,尤其是在写入 S3 时。 This article 对此进行了更彻底的讨论,但您需要考虑更改 2 个设置。

使用 DirectParquetOutputCommitter。默认情况下,Spark 会将所有数据保存到一个临时文件夹,然后再移动这些文件。使用 DirectParquetOutputCommitter 将通过直接写入 S3 输出路径来节省时间

No longer available in Spark 2.0+ 如 jira 票中所述,当前的解决方案是
    将您的代码切换为使用 s3a 和 Hadoop 2.7.2+;它更全面,在 Hadoop 2.8 中变得更好,并且是 s3guard 的基础 使用 Hadoop FileOutputCommitter 并将 mapreduce.fileoutputcommitter.algorithm.version 设置为 2

-从Spark 1.5 开始,模式合并默认关闭。 关闭模式合并。如果模式合并打开,驱动程序节点将扫描所有文件以确保模式一致。这是特别昂贵的,因为它不是分布式操作。确保通过以下操作将其关闭

val file = sqx.read.option("mergeSchema", "false").parquet(path)

【讨论】:

从 Spark 2.0 开始,DirectParquetOutputCommitter 不再可用。新解决方案见SPARK-10063 @TalJoffe 您尝试过他们的解决方案吗?如果是这样,它是如何工作的?你能回答吗? 如果性能几乎相同,这是否意味着它不是一个真正有效的解决方案? @zzztimbo 我认为他的评论是指所指出的解决方法以及已弃用的 DirectParquetOutputCommitter(因此比编写镶木地板文件的开箱即用方式更好)。但是,我还没有尝试过。 从 1.5.0 spark.apache.org/docs/latest/…987654325@开始,Merge schema 默认也是 false 【参考方案2】:

直接输出提交者从 spark 代码库中消失了;您将在自己的 JAR 中编写/恢复已删除的代码。如果您这样做,请在您的工作中关闭猜测,并知道其他故障也可能导致问题,其中问题是“无效数据”。

值得一提的是,Hadoop 2.8 将增加一些 S3A 加速,专门用于从 S3 读取优化的二进制格式(ORC、Parquet);有关详细信息,请参阅HADOOP-11694。有些人正在努力使用 Amazon Dynamo 来实现一致的元数据存储,这应该能够在工作结束时进行稳健的 O(1) 提交。

【讨论】:

【参考方案3】:

加快 Spark 写入 S3 的直接方法之一是使用 EMRFS S3-optimized Committer 。

然而,如果你使用 s3a 这个提交者cannot be used:

未使用 EMRFS S3 优化的 Committer 时

以下情况下不使用committer:

When writing to HDFS

-> When using the S3A file system

When using an output format other than Parquet, such as ORC or text

When using MapReduce or Spark's RDD API

我在 AWS EMR 5.26 上测试了这种差异,使用 s3:// 比 s3a:// 快 15%-30%(但仍然很慢)。

我设法完成这种复制/写入的最快方法是将 Parquet 写入本地 HDFS,然后使用 s3distcp 复制到 S3;在一个特定场景(数百个小文件)中,这比将 DataFrame 直接写入 Parquet 到 S3 快 5 倍。

【讨论】:

+1 首先写入 HDFS,然后将这些文件移动到 s3(尽管我使用 gnu parallel + aws cli 命令而不是 s3distcp)。不过,这绝对取决于您的数据,这并不是万能的解决方案。【参考方案4】:

我也有这个问题。除了其他人所说的之外,这是来自 AWS 的完整解释:https://aws.amazon.com/blogs/big-data/improve-apache-spark-write-performance-on-apache-parquet-formats-with-the-emrfs-s3-optimized-committer/

在我的实验中,只是更改为 FileOutCommiter v2(从 v1)改进了 3-4 倍的写入。

self.sc._jsc.hadoopConfiguration().set("mapreduce.fileoutputcommitter.algorithm.version", "2")

【讨论】:

以上是关于使用 Spark 通过 s3a 将 parquet 文件写入 s3 非常慢的主要内容,如果未能解决你的问题,请参考以下文章

您可以在 Spark/Hadoop 中将 s3:// 翻译(或别名)为 s3a:// 吗?

Apache Spark错误使用hadoop将数据卸载到AWS S3

spark sql 无法在 S3 中查询镶木地板分区

Apache Spark Hadoop S3A SignatureDoesNotMatch

Amazon s3a 使用 Spark 返回 400 Bad Request

如何从 Apache Spark 访问 s3a:// 文件?