来自 EMR/Spark 的 S3 写入时间极慢
Posted
技术标签:
【中文标题】来自 EMR/Spark 的 S3 写入时间极慢【英文标题】:Extremely slow S3 write times from EMR/ Spark 【发布时间】:2017-08-06 22:37:51 【问题描述】:我写信是想看看有没有人知道如何加快 EMR 中运行的 Spark 的 S3 写入时间?
我的 Spark 作业需要 4 个多小时才能完成,但集群仅在前 1.5 小时内处于负载状态。
我一直很好奇 Spark 一直在做什么。我查看了日志,发现许多s3 mv
命令,每个文件一个。然后直接看一下 S3,我发现我所有的文件都在 _temporary 目录中。
其次,我担心我的集群成本,看来我需要为这个特定任务购买 2 小时的计算。但是,我最终购买了 5 个小时。我很好奇 EMR AutoScaling 在这种情况下是否可以帮助降低成本。
一些文章讨论了更改文件输出提交器算法,但我在这方面收效甚微。
sc.hadoopConfiguration.set("mapreduce.fileoutputcommitter.algorithm.version", "2")
写入本地 HDFS 很快。我很好奇如果发出 hadoop 命令将数据复制到 S3 会更快吗?
【问题讨论】:
我在这个问题上找到了两篇很棒的文章hortonworks.github.io/hdp-aws/s3-sparkhortonworks.github.io/hdp-aws/s3-performance/index.html 最后,你使用了哪个实现?我遇到了同样的问题。 @JaspinderVirdee 将您的数据写入本地 HDFS 目录,然后使用s3-dist-cp
将您的数据复制回 S3。此外,如果您的 EMR 集群缺少 s3-dist-cp
命令,您必须在 create-cluster 命令中列出 Hadoop。例如:--applications Name=Hadoop Name=Spark Name=Ganglia Name=zeppelin
你的意思是我使用s3只是为了备份?例如 - 目前,我正在使用 Spark Streaming,并且我的数据按键分区 - 直接保存在 s3 上的“城市”。每个流批多 1 分钟的数据进来并添加到每个城市中。我将如何使用您的策略将这些数据附加到 s3 -> 保存在本地然后复制到 s3。
我的流的最后一步是保存到 fs(hdfs 或 s3)。在hdfs上成功写入后如何将更改推送到s3?甚至可以使用 scala/python 代码吗?还是仅在 shell 中使用 s3-dist-cp?
【参考方案1】:
您看到的是 outputcommitter 和 s3 的问题。
提交作业在 _temporary 文件夹上应用 fs.rename
,并且由于 S3 不支持重命名,这意味着单个请求现在正在将所有文件从 _temporary 复制并删除到其最终目的地。..
sc.hadoopConfiguration.set("mapreduce.fileoutputcommitter.algorithm.version", "2")
仅适用于 hadoop 版本 > 2.7。它的作用是在提交任务时从 _temporary 复制每个文件,而不是提交作业,因此它是分布式的并且运行速度非常快。
如果您使用旧版本的 hadoop,我会使用 Spark 1.6 并使用:
sc.hadoopConfiguration.set("spark.sql.parquet.output.committer.class","org.apache.spark.sql.parquet.DirectParquetOutputCommitter")
*请注意,它不适用于推测打开或以附加模式写入
**另请注意,它在 Spark 2.0 中已弃用(由 algorithm.version=2 代替)
顺便说一句,在我的团队中,我们实际上是使用 Spark 写入 HDFS 并在生产中使用 DISTCP 作业(特别是 s3-dist-cp)将文件复制到 S3,但这样做是出于其他几个原因(一致性、容错性)所以它没有必要.. 你可以使用我的建议快速写入 S3。
【讨论】:
更快但有风险。 Spark 2 不仅删除了该文件,如果您使用任何包含“直接”一词的提交者,您将被告知并禁用推测。 有风险,是的,但没有它会很慢,无法使用......你可以有一个火花作业,它在 5 分钟内完成并写入一个多小时,而且它也不可扩展......我不如果您想写信给 s3,不知道有什么更好的。 啊这对我不起作用,因为我使用的是附加模式! @SteveLoughran 阅读您的回答后,我明白您的意思。我可能会亲自尝试您的建议来替换当前的解决方案。 s3-dist-cp 在 hdfs 上创建临时文件,这些文件在成功时不会被删除,所以我正在丢失空间,是否有任何参数告诉 s3-dist-cp 删除临时文件。 【参考方案2】:我有类似的用例,我使用 spark 写入 s3 并遇到性能问题。主要原因是 spark 创建了许多零字节部分文件,并将临时文件替换为实际文件名减慢了写入过程。尝试以下方法作为解决方法
将 spark 的输出写入 HDFS 并使用 Hive 写入 s3。由于 hive 创建的零件文件数量较少,因此性能要好得多。我遇到的问题是(使用 spark 时也有同样的问题),由于安全原因,prod env 中没有提供对 Policy 的删除操作。在我的情况下,S3 存储桶是 kms 加密的。
将 spark 输出写入 HDFS 并将 hdfs 文件复制到本地并使用 aws s3 copy 将数据推送到 s3。使用这种方法获得了第二好的结果。与亚马逊创建票,他们建议使用这张票。
使用 s3 dist cp 将文件从 HDFS 复制到 S3。这没有问题,但性能不佳
【讨论】:
亚马逊最近可能对 s3-dist-cp 进行了改进。在我们的 EMR 集群上,它的性能非常好,在不到 2 分钟的时间内将大约 200GB 从 HDFS 复制到 S3。【参考方案3】:直接提交者从 spark 中撤出,因为它对失败没有弹性。我强烈建议不要使用它。
Hadoop s3guard 正在进行工作,以添加 0 重命名提交者,这将是 O(1) 和容错;关注HADOOP-13786。
暂时忽略“魔术提交者”,基于 Netflix 的临时提交者将首先发布(hadoop 2.9?3.0?)
-
这会将工作写入本地 FS,在任务提交中
发出未提交的多部分放置操作来写入数据,但不具体化它。
保存将 PUT 提交到 HDFS 所需的信息,使用原始的“算法 1”文件输出提交程序
实现一个作业提交,它使用 HDFS 的文件输出提交来决定完成哪些 PUT,取消哪些。
结果:任务提交需要数据/带宽秒数,但作业提交所用时间不超过在目标文件夹上执行 1-4 次 GET 并为每个待处理文件执行 POST 的时间,后者被并行化。
您可以选择这项工作所基于的提交者from netflix,并可能在今天的 Spark 中使用它。请务必设置文件提交算法 = 1(应该是默认值),否则它不会实际写入数据。
【讨论】:
史蒂夫,在 Hadoop 中包含 Netflix S3 提交程序是否存在 Apache JIRA 问题?我好像找不到。 它是 HADOOP-13786。这种集成暂时不会成为您可以掌握的任何东西,因为它正在与 S3a 第二阶段工作混合在一起。原始代码今天应该可以工作(netflix 使用它),本周有一个 Hadoop 2.8.0 RC,它具有所有的读写管道加速 当然,正如 LI 所说,您在 EMR 团队中,但您拥有自己的 s3 客户端。 netflix Staging 提交者应该在那里工作,“魔术”提交者绝对不是,但这是第二个优先于 staging 提交者github.com/steveloughran/hadoop/blob/s3guard/… 谢谢你,史蒂夫!我只是想更多地了解那个提交者,因为我还没有听说过它,我希望能够跟踪 JIRA(如果存在)。【参考方案4】:我也是同样的问题,我找到了更改 s3 协议的解决方案,最初我使用 s3a:// 来读写数据,然后我改为仅 s3:// 并且它工作得很好,实际上是我的进程追加数据。
【讨论】:
你能添加更多关于你的方法的细节吗?像 spark 和 hadoop 版本,使用 jars。【参考方案5】:您在火花输出中看到了什么? 如果你看到很多重命名操作,请阅读this
【讨论】:
【参考方案6】:我们在 Azure 上使用 WASB 上的 Spark 也遇到了同样的情况。 我们最终决定不直接将分布式存储与 spark 一起使用。 我们做了 spark.write 到一个真正的 hdfs:// 目的地并开发了一个特定的工具:hadoop copyFromLocal hdfs:// wasb:// 在归档到 WASB(或 S3)之前,HDFS 是我们的临时缓冲区。
【讨论】:
【参考方案7】:您正在编写的文件有多大?让一个核心写入一个非常大的文件将比拆分文件并让多个工作人员写入较小的文件要慢得多。
【讨论】:
以上是关于来自 EMR/Spark 的 S3 写入时间极慢的主要内容,如果未能解决你的问题,请参考以下文章
从 EMR Spark 处理 s3 gzip 文件的有效方法
Jupyter + EMR + Spark - 从本地机器上的 Jupyter notebook 连接到 EMR 集群
从 EMR spark 连接到 EMR presto - 连接失败