Spark抛出java.io.IOException:保存part-xxxxx.gz时重命名失败
Posted
技术标签:
【中文标题】Spark抛出java.io.IOException:保存part-xxxxx.gz时重命名失败【英文标题】:Spark throws java.io.IOException: Failed to rename when saving part-xxxxx.gz 【发布时间】:2018-06-26 20:02:07 【问题描述】:这里是新的 Spark 用户。我正在从存储在 AWS S3 上的许多 .tif 图像中提取特征,每个图像的标识符都像 02_R4_C7。我正在使用 Spark 2.2.1 和 hadoop 2.7.2。
我正在使用所有默认配置,如下所示:
conf = SparkConf().setAppName("Feature Extraction")
sc = SparkContext(conf=conf)
sc.setLogLevel("ERROR")
sqlContext = SQLContext(sc)
这是在某些功能成功保存在图像 id 文件夹中作为 part-xxxx.gz 文件后失败的函数调用:
features_labels_rdd.saveAsTextFile(text_rdd_direct,"org.apache.hadoop.io.compress.GzipCodec")
请参阅下面的错误。当我删除已成功创建的功能 part-xxxx.gz 文件并重新运行脚本时,它以看似不确定的方式在不同的图像和 part-xxxx.gz 处失败。我确保在重新运行之前删除所有功能。我的理论是两个工作人员试图创建同一个临时文件并且相互冲突,因为同一个文件有两个相同的错误消息,但相隔一秒。
我不知道该怎么做,我看到 spark 列表 configurations 可以改变 spark 处理任务的方式,但我不确定这里有什么帮助,因为我不明白我遇到的问题。非常感谢任何帮助!
SLF4J: Class path contains multiple SLF4J bindings.
*SLF4J: Found binding in [jar:file:/usr/local/spark/jars/slf4j-
log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/local/hadoop/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
18/06/26 19:24:40 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
18/06/26 19:24:41 WARN spark.SparkConf: In Spark 1.0 and later spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone and LOCAL_DIRS in YARN).
n images = 512
Feature file of 02_R4_C7 is created
[Stage 3:=================> (6 + 14) / 20]18/06/26 19:24:58 ERROR mapred.SparkHadoopMapRedUtil: Error committing the output of task: attempt_20180626192453_0003_m_000007_59
java.io.IOException: Failed to rename FileStatuspath=s3n://activemapper/imagery/southafrica/wv2/RDD48FeaturesTextFile/02_R4_C6/_temporary/0/_temporary/attempt_20180626192453_0003_m_000007_59/part-00007.gz; isDirectory=false; length=952309; replication=1; blocksize=67108864; modification_time=1530041098000; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false to s3n://activemapper/imagery/southafrica/wv2/RDD48FeaturesTextFile/02_R4_C6/part-00007.gz
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:415)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:428)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitTask(FileOutputCommitter.java:539)
at org.apache.hadoop.mapred.FileOutputCommitter.commitTask(FileOutputCommitter.java:172)
at org.apache.hadoop.mapred.OutputCommitter.commitTask(OutputCommitter.java:343)
at org.apache.spark.mapred.SparkHadoopMapRedUtil$.performCommit$1(SparkHadoopMapRedUtil.scala:50)
at org.apache.spark.mapred.SparkHadoopMapRedUtil$.commitTask(SparkHadoopMapRedUtil.scala:76)
at org.apache.spark.internal.io.SparkHadoopWriter.commit(SparkHadoopWriter.scala:105)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1146)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1125)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
[Stage 3:=====================================> (13 + 7) / 20]18/06/26 19:24:58 ERROR executor.Executor: Exception in task 7.0 in stage 3.0 (TID 59)
java.io.IOException: Failed to rename FileStatuspath=s3n://activemapper/imagery/southafrica/wv2/RDD48FeaturesTextFile/02_R4_C6/_temporary/0/_temporary/attempt_20180626192453_0003_m_000007_59/part-00007.gz; isDirectory=false; length=952309; replication=1; blocksize=67108864; modification_time=1530041098000; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false to s3n://activemapper/imagery/southafrica/wv2/RDD48FeaturesTextFile/02_R4_C6/part-00007.gz
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:415)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.mergePaths(FileOutputCommitter.java:428)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.commitTask(FileOutputCommitter.java:539)
at org.apache.hadoop.mapred.FileOutputCommitter.commitTask(FileOutputCommitter.java:172)
at org.apache.hadoop.mapred.OutputCommitter.commitTask(OutputCommitter.java:343)
at org.apache.spark.mapred.SparkHadoopMapRedUtil$.performCommit$1(SparkHadoopMapRedUtil.scala:50)
at org.apache.spark.mapred.SparkHadoopMapRedUtil$.commitTask(SparkHadoopMapRedUtil.scala:76)
at org.apache.spark.internal.io.SparkHadoopWriter.commit(SparkHadoopWriter.scala:105)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1146)
at org.apache.spark.rdd.PairRDDFunctions$$anonfun$saveAsHadoopDataset$1$$anonfun$12.apply(PairRDDFunctions.scala:1125)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:108)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
18/06/26 19:24:58 ERROR scheduler.TaskSetManager: Task 7 in stage 3.0 failed 1 times; aborting job
Traceback (most recent call last):
File "run_feature_extraction_spark.py", line 88, in <module>
main(sc)
File "run_feature_extraction_spark.py", line 75, in main
features_labels_rdd.saveAsTextFile(text_rdd_direct, "org.apache.hadoop.io.compress.GzipCodec")
File "/home/ubuntu/.local/lib/python2.7/site-packages/pyspark/rdd.py", line 1551, in saveAsTextFile
keyed._jrdd.map(self.ctx._jvm.BytesToString()).saveAsTextFile(path, compressionCodec)
File "/home/ubuntu/.local/lib/python2.7/site-packages/py4j/java_gateway.py", line 1133, in __call__
answer, self.gateway_client, self.target_id, self.name)
File "/home/ubuntu/.local/lib/python2.7/site-packages/pyspark/sql/utils.py", line 63, in deco
return f(*a, **kw)
File "/home/ubuntu/.local/lib/python2.7/site-packages/py4j/protocol.py", line 319, in get_return_value
format(target_id, ".", name), value)
py4j.protocol.Py4JJavaError: An error occurred while calling o76.saveAsTextFile.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 7 in stage 3.0 failed 1 times, most recent failure: Lost task 7.0 in stage 3.0 (TID 59, localhost, executor driver): java.io.IOException: Failed to rename FileStatuspath=s3n://activemapper/imagery/southafrica/wv2/RDD48FeaturesTextFile/02_R4_C6/_temporary/0/_temporary/attempt_20180626192453_0003_m_000007_59/part-00007.gz; isDirectory=false; length=952309; replication=1; blocksize=67108864; modification_time=1530041098000; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false to s3n://activemapper/imagery/southafrica/wv2/RDD48FeaturesTextFile/02_R4_C6/part-00007.gz*
当我再次运行它时,脚本使它更远,但由于不同的图像文件夹和 part-xxxx.gz 文件出现相同的错误而失败
SLF4J: Class path contains multiple SLF4J bindings.
SLF4J: Found binding in [jar:file:/usr/local/spark/jars/slf4j-log4j12-1.7.16.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: Found binding in [jar:file:/usr/local/hadoop/share/hadoop/common/lib/slf4j-log4j12-1.7.10.jar!/org/slf4j/impl/StaticLoggerBinder.class]
SLF4J: See http://www.slf4j.org/codes.html#multiple_bindings for an explanation.
SLF4J: Actual binding is of type [org.slf4j.impl.Log4jLoggerFactory]
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
18/06/26 19:37:24 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
18/06/26 19:37:24 WARN spark.SparkConf: In Spark 1.0 and later spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone and LOCAL_DIRS in YARN).
n images = 512
Feature file of 02_R4_C7 is created
Feature file of 02_R4_C6 is created
Feature file of 02_R4_C5 is created
Feature file of 02_R4_C4 is created
Feature file of 02_R4_C3 is created
Feature file of 02_R4_C2 is created
Feature file of 02_R4_C1 is created
[Stage 15:==========================================> (15 + 5) / 20]18/06/26 19:38:16 ERROR mapred.SparkHadoopMapRedUtil: Error committing the output of task: attempt_20180626193811_0015_m_000017_285
java.io.IOException: Failed to rename FileStatuspath=s3n://activemapper/imagery/southafrica/wv2/RDD48FeaturesTextFile/02_R4_C0/_temporary/0/_temporary/attempt_20180626193811_0015_m_000017_285/part-00017.gz; isDirectory=false; length=896020; replication=1; blocksize=67108864; modification_time=1530041897000; access_time=0; owner=; group=; permission=rw-rw-rw-; isSymlink=false to s3n://activemapper/imagery/southafrica/wv2/RDD48FeaturesTextFile/02_R4_C0/part-00017.gz
【问题讨论】:
【参考方案1】:如果没有“一致性层”(一致的 EMR,或来自 Apache Hadoop 项目本身,S3Guard)或专门为使用 S3 设计的特殊输出提交器(Hadoop 3.1 +“S3A 提交者”)。重命名是失败的地方,因为列表不一致意味着扫描要复制的文件可能会丢失数据,或者找到无法重命名的已删除文件。您的堆栈跟踪看起来与我期望的完全一样:作业提交显然随机失败。
不详述,这里是Ryan Blue on the topic的视频
解决方法:写入本地集群 FS,然后使用 distcp 上传到 S3。
PS:对于 Hadoop 2.7+,切换到 s3a:// 连接器。在没有启用 S3Guard 的情况下,它具有完全相同的一致性问题,但性能更好。
【讨论】:
我在使用 df.write.partitionBy('col') 和 s3a:// 时遇到了类似的问题。在我使用 partitionBy 进行写入之前,我从来没有遇到过这个问题。关于为什么 partitionBy 导致这些重命名错误的任何建议?谢谢! tanner,这个和404缓存有关,其中一个HEAD For文件可以在s3A负载均衡器中缓存一个404至少10s;如果进行更多 HEAD 调用,则更长。它可能正在创建一些短文件,并且返回的流上的 create() 和 close() 之间的间隙是如此之短,这表面。修复:在 Hadoop 3.1+ 工件中使用新的提交者 @Steve Loughran 我没有使用 S3,但是在我的 EMR hdfs://path 上写入时我遇到了这个问题......如何解决这个问题,请建议? @Steve Loughran 您能否建议如何在 hdfs 中处理此问题***.com/questions/62036791/… 不要靠近 HDFS。这不是一致性或 404 缓存问题。另外:EMR 是 hadoop、spark 等的闭源分支。向他们提出支持问题。【参考方案2】:@Steve Loughran 帖子中的解决方案很棒。只是添加一些信息来帮助解释问题。
Hadoop-2.7 使用 Hadoop 提交协议进行提交。当 Spark 将结果保存到 S3 时,它实际上是先将临时结果保存到 S3 并在作业成功时通过重命名它使其可见(原因和细节可以在这个伟大的 doc 中找到)。但是,S3 是一个对象存储,没有真正的“重命名”;它复制数据到目标对象,然后删除原始对象。
S3 是“最终一致的”,这意味着 delete 操作可能在 copy 完全同步之前发生。发生这种情况时,重命名将失败。
在我的情况下,这仅在某些连锁作业中触发。我还没有在简单的保存工作中看到这一点。
【讨论】:
我发现了另一个原因:在 S3 负载均衡器中缓存了 404。如果有任何东西通过 HEAD 调用查找存在的文件,则可以缓存 404,之后可以返回,甚至 20-30 秒。列表看到文件,但复制失败。我们正在 S3a 中对其进行调整,以在使用 overwrite==true 创建文件时删除这些检查,但在 hadoop-3.2.x 中发布需要一段时间。现在:在创建和重命名之间留一个间隙不要查找文件 当我使用 spark.write.partitionBy('col') 到 s3a:// 时,我遇到了同样的重命名错误。关于为什么会发生这种情况的任何想法?有什么解决办法吗?谢谢! @TannerClark 正如 Steven Loughran 之前的帖子中提到的,您可以先写入本地集群,然后使用distcp
或 aws s3 cp
将结果加载到 S3。
不是我最好的工作,但我正在遍历分区字段并导出每个分区(因为我只有 45 个大分区)。没有失败!
@Fang Zhang 我没有使用 S3,但是在我的 EMR hdfs://path 上写入时我遇到了这个问题...如何解决这个问题,请建议?【参考方案3】:
当没有线程可用于执行并发任务时会发生这种情况。在 hdfs 中设置以下属性有效
dfs.datanode.handler.count = >10
【讨论】:
以上是关于Spark抛出java.io.IOException:保存part-xxxxx.gz时重命名失败的主要内容,如果未能解决你的问题,请参考以下文章