saveAsTextFile 在 spark java.io.IOException 中挂起:数据框中的对等方重置连接

Posted

技术标签:

【中文标题】saveAsTextFile 在 spark java.io.IOException 中挂起:数据框中的对等方重置连接【英文标题】:saveAsTextFile hangs in spark java.io.IOException: Connection reset by peer in Data frame 【发布时间】:2017-09-13 06:44:04 【问题描述】:

我在 spark 中运行一个应用程序,它在两个数据帧之间进行简单的差异。 我在集群环境中作为 jar 文件执行。 我的集群环境是 94 节点集群。 有两个数据集 2 GB 和 4 GB 映射到数据帧。

对于非常小的文件,我的工作很好......

我个人认为saveAsTextFile 在我的申请中需要更多时间 在我的集群配置详细信息下方

Total Vmem allocated for Containers     394.80 GB
Total Vmem allocated for Containers     394.80 GB
Total VCores allocated for Containers   36  

这就是我运行 Spark 工作的方式

spark-submit --queue root.queue --deploy-mode client --master yarn SparkApplication-SQL-jar-with-dependencies.jar

这是我的代码。

object TestDiff 

   def main(args: Array[String]) 

    val conf = new SparkConf().setAppName("WordCount"); 

    conf.set("spark.executor.memory", "32g")
    conf.set("spark.driver.memory", "32g")
    conf.set("spark.driver.maxResultSize", "4g")

    val sc = new SparkContext(conf); //Creating spark context
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    import sqlContext.implicits._

    import org.apache.spark. SparkConf, SparkContext 
    import org.apache.spark.sql.Row
    import org.apache.spark.sql.types. StructType, StructField, StringType, DoubleType, IntegerType 
    import org.apache.spark.sql.functions.udf

     val schema = StructType(Array(
      StructField("filler1", StringType),
      StructField("dunsnumber", StringType),
      StructField("transactionalindicator", StringType)))

    import org.apache.spark.sql.functions._

    val textRdd1 = sc.textFile("/home/cloudera/TRF/PCFP/INCR")

    val rowRdd1 = textRdd1.map(line => Row.fromSeq(line.split("\\|", -1)))
    var df1 = sqlContext.createDataFrame(rowRdd1, schema)

    val textRdd2 = sc.textFile("/home/cloudera/TRF/PCFP/MAIN")

    val rowRdd2 = textRdd2.map(line => Row.fromSeq(line.split("\\|", -1)))
    var df2 = sqlContext.createDataFrame(rowRdd2, schema)

    //Finding the diff between two if any of the columns has changed 
    val diffAnyColumnDF = df1.except(df2)
    diffAnyColumnDF.rdd.coalesce(1).saveAsTextFile("Diffoutput") 


需要超过 30 分钟,然后失败。

以下例外

这是日志

Caused by: java.util.concurrent.TimeoutException: Futures timed out after [10 seconds]
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
    at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
    at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
    at scala.concurrent.Await$.result(package.scala:107)
    at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
    ... 14 more
17/09/15 11:55:01 WARN netty.NettyRpcEnv: Ignored message: HeartbeatResponse(false)
17/09/15 11:56:19 WARN netty.NettyRpcEndpointRef: Error sending message [message = Heartbeat(1,[Lscala.Tuple2;@7fe57079,BlockManagerId(1, c755kds.int.int.com, 33507))] in 1 attempts
org.apache.spark.rpc.RpcTimeoutException: Futures timed out after [10 seconds]. This timeout is controlled by spark.executor.heartbeatInterval
    at org.apache.spark.rpc.RpcTimeout.org$apache$spark$rpc$RpcTimeout$$createRpcTimeoutException(RpcTimeout.scala:48)
    at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:63)
    at org.apache.spark.rpc.RpcTimeout$$anonfun$addMessageIfTimeout$1.applyOrElse(RpcTimeout.scala:59)
    at scala.runtime.AbstractPartialFunction.apply(AbstractPartialFunction.scala:33)
    at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:76)
    at org.apache.spark.rpc.RpcEndpointRef.askWithRetry(RpcEndpointRef.scala:101)
    at org.apache.spark.executor.Executor.org$apache$spark$executor$Executor$$reportHeartBeat(Executor.scala:491)
    at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply$mcV$sp(Executor.scala:520)
    at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:520)
    at org.apache.spark.executor.Executor$$anon$1$$anonfun$run$1.apply(Executor.scala:520)
    at org.apache.spark.util.Utils$.logUncaughtExceptions(Utils.scala:1818)
    at org.apache.spark.executor.Executor$$anon$1.run(Executor.scala:520)
    at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
    at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)
Caused by: java.util.concurrent.TimeoutException: Futures timed out after [10 seconds]
    at scala.concurrent.impl.Promise$DefaultPromise.ready(Promise.scala:219)
    at scala.concurrent.impl.Promise$DefaultPromise.result(Promise.scala:223)
    at scala.concurrent.Await$$anonfun$result$1.apply(package.scala:107)
    at scala.concurrent.BlockContext$DefaultBlockContext$.blockOn(BlockContext.scala:53)
    at scala.concurrent.Await$.result(package.scala:107)
    at org.apache.spark.rpc.RpcTimeout.awaitResult(RpcTimeout.scala:75)
    ... 14 more
17/09/15 11:56:19 WARN netty.NettyRpcEnv: Ignored message: HeartbeatResponse(false)

请建议如何调整我的 spark 作业?

我刚刚更改了执行器内存,它的工作成功了,但它非常非常慢。

conf.set("spark.executor.memory", "64g")

但是工作很慢...大约需要 15 分钟才能完成..

作业需要 15 分钟才能完成。

附加 DAG 可视化

增加超时配置后低于错误..

 executor 5): ExecutorLostFailure (executor 5 exited caused by one of the running tasks) Reason: Executor heartbeat timed out after 175200 ms

【问题讨论】:

你用的是纱线模式吗? 您还可以粘贴优化器执行计划、任务执行时间和 DAG。进一步什么是你的执行者配置?数量、内存、内核。 是的,我正在使用这样的纱线模式 --deploy-mode client --master yarn @datmannz 我的工作运行了 30 多分钟,然后它失败了...... 尝试 conf.set("spark.driver.maxResultSize", "10g") 和 repartition(10) 而不是合并 【参考方案1】:

我认为您的单个分区文件很大。通过 TCP 通道传输数据需要很长时间,并且连接无法长时间保持活动状态并被重置。

你能合并到更多的分区吗?

【讨论】:

当然我会尝试使用 500,例如 diffAnyColumnDF.rdd.coalesce(500, shuffle = true).saveAsTextFile("Diffoutput") 相同结果作业再次失败 错误是否一样?您的名称节点是否已启动,数据节点是否已启动?您可以粘贴生成的完整日志吗? 特别感兴趣的是行 ""17/09/13 06:44:44 ERROR cluster.YarnScheduler: Lost executor 1 on a0123456.int.intgroup.com: Executor heartbeat timed out after 137337 ms ."" 它清楚地表明应用程序主人无法从 d 执行者之一那里获得心跳。现在,通常需要监视网络或您的节点的原因。有意义吗? 是的,这是同样的错误,我的节点已启动 ..其他 map reduce 作业正在通过 ..在 UP 中,我可以看到在 saveAsTextFile 阶段作业需要很长时间

以上是关于saveAsTextFile 在 spark java.io.IOException 中挂起:数据框中的对等方重置连接的主要内容,如果未能解决你的问题,请参考以下文章

Spark-saveAsTextFile 分区设置

Spark-saveAsTextFile 分区设置

spark中saveAsTextFile如何最终生成一个文件

spark中saveAsTextFile如何最终生成一个文件

Spark saveAsTextFile() 导致 Mkdirs 无法为一半目录创建

saveAsTextFile 在 spark java.io.IOException 中挂起:数据框中的对等方重置连接