EMR 上 Spark 中的 S3 减速错误
Posted
技术标签:
【中文标题】EMR 上 Spark 中的 S3 减速错误【英文标题】:S3 SlowDown error in Spark on EMR 【发布时间】:2018-02-16 14:32:49 【问题描述】:我在编写 parquet 文件时遇到此错误,最近开始发生这种情况
com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.model.AmazonS3Exception: Please reduce your request rate. (Service: Amazon S3; Status Code: 503; Error Code: SlowDown; Request ID: 2CA496E2AB87DC16), S3 Extended Request ID: 1dBrcqVGJU9VgoT79NAVGyN0fsbj9+6bipC7op97ZmP+zSFIuH72lN03ZtYabNIA2KaSj18a8ho=
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.handleErrorResponse(AmazonHttpClient.java:1389)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeOneRequest(AmazonHttpClient.java:902)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeHelper(AmazonHttpClient.java:607)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.doExecute(AmazonHttpClient.java:376)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.executeWithTimer(AmazonHttpClient.java:338)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.http.AmazonHttpClient.execute(AmazonHttpClient.java:287)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.invoke(AmazonS3Client.java:3826)
at com.amazon.ws.emr.hadoop.fs.shaded.com.amazonaws.services.s3.AmazonS3Client.deleteObjects(AmazonS3Client.java:1777)
at com.amazon.ws.emr.hadoop.fs.s3.lite.call.DeleteObjectsCall.perform(DeleteObjectsCall.java:22)
at com.amazon.ws.emr.hadoop.fs.s3.lite.call.DeleteObjectsCall.perform(DeleteObjectsCall.java:7)
at com.amazon.ws.emr.hadoop.fs.s3.lite.executor.GlobalS3Executor.execute(GlobalS3Executor.java:75)
at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.invoke(AmazonS3LiteClient.java:176)
at com.amazon.ws.emr.hadoop.fs.s3.lite.AmazonS3LiteClient.deleteObjects(AmazonS3LiteClient.java:125)
at com.amazon.ws.emr.hadoop.fs.s3n.Jets3tNativeFileSystemStore.deleteAll(Jets3tNativeFileSystemStore.java:355)
at sun.reflect.GeneratedMethodAccessor121.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invokeMethod(RetryInvocationHandler.java:191)
at org.apache.hadoop.io.retry.RetryInvocationHandler.invoke(RetryInvocationHandler.java:102)
at com.sun.proxy.$Proxy28.deleteAll(Unknown Source)
at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.doSingleThreadedBatchDelete(S3NativeFileSystem.java:1331)
at com.amazon.ws.emr.hadoop.fs.s3n.S3NativeFileSystem.delete(S3NativeFileSystem.java:663)
at com.amazon.ws.emr.hadoop.fs.EmrFileSystem.delete(EmrFileSystem.java:296)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.cleanupJob(FileOutputCommitter.java:463)
at org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter.abortJob(FileOutputCommitter.java:482)
at org.apache.spark.internal.io.HadoopMapReduceCommitProtocol.abortJob(HadoopMapReduceCommitProtocol.scala:134)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply$mcV$sp(FileFormatWriter.scala:146)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:121)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$$anonfun$write$1.apply(FileFormatWriter.scala:121)
at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:57)
at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:121)
at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:101)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:58)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:56)
at org.apache.spark.sql.execution.command.ExecutedCommandExec.doExecute(commands.scala:74)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:114)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:135)
at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:132)
at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:113)
at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:87)
at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:87)
at org.apache.spark.sql.execution.datasources.DataSource.write(DataSource.scala:492)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:215)
at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:198)
at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:494)
at com.radius.network_data.ingestion.processors.NetworkDataPathProcessor.process(NetworkDataPathProcessor.scala:38)
at com.radius.network_data.ingestion.NetworkDataIngestionPipeline.com$radius$network_data$ingestion$NetworkDataIngestionPipeline$$processClient(NetworkDataIngestionPipeline.scala:51)
at com.radius.network_data.ingestion.NetworkDataIngestionPipeline$$anonfun$run$1$$anonfun$apply$1.apply(NetworkDataIngestionPipeline.scala:42)
at com.radius.network_data.ingestion.NetworkDataIngestionPipeline$$anonfun$run$1$$anonfun$apply$1.apply(NetworkDataIngestionPipeline.scala:41)
at scala.collection.immutable.Set$Set1.foreach(Set.scala:94)
at com.radius.network_data.ingestion.NetworkDataIngestionPipeline$$anonfun$run$1.apply(NetworkDataIngestionPipeline.scala:41)
at com.radius.network_data.ingestion.NetworkDataIngestionPipeline$$anonfun$run$1.apply(NetworkDataIngestionPipeline.scala:39)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.parallel.ParIterableLike$Foreach.leaf(ParIterableLike.scala:972)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply$mcV$sp(Tasks.scala:49)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
at scala.collection.parallel.Task$$anonfun$tryLeaf$1.apply(Tasks.scala:48)
at scala.collection.parallel.Task$class.tryLeaf(Tasks.scala:51)
at scala.collection.parallel.ParIterableLike$Foreach.tryLeaf(ParIterableLike.scala:969)
at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.internal(Tasks.scala:159)
at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.internal(Tasks.scala:443)
at scala.collection.parallel.AdaptiveWorkStealingTasks$WrappedTask$class.compute(Tasks.scala:149)
at scala.collection.parallel.AdaptiveWorkStealingForkJoinTasks$WrappedTask.compute(Tasks.scala:443)
at scala.concurrent.forkjoin.RecursiveAction.exec(RecursiveAction.java:160)
at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
任何想法可能是什么问题?我正在使用这些写入选项
result.write
.mode(SaveMode.Overwrite)
.partitionBy("entityType")
.parquet(joinedPath)
【问题讨论】:
您的吞吐量最近增加了吗?一个批次平均有多少个镶木地板文件?一般(docs.aws.amazon.com/AmazonS3/latest/dev/…) 我不认为它的吞吐量增加了很多...您认为错误可能与零件文件的数量有关吗? 【参考方案1】:显然问题是由于写入了太多非常小的部分文件,已通过减少输出分区数解决
【讨论】:
以上是关于EMR 上 Spark 中的 S3 减速错误的主要内容,如果未能解决你的问题,请参考以下文章
AWS EMR - 将数据帧保存到 S3 上的配置单元外部表中 - 出现内存泄漏错误
使用 403 写入 S3 时,在 EMR 上运行的 Spark 偶尔会失败