作业 65 取消,因为 SparkContext 已关闭

Posted

技术标签:

【中文标题】作业 65 取消,因为 SparkContext 已关闭【英文标题】:Job 65 cancelled because SparkContext was shut down 【发布时间】:2020-09-02 08:49:26 【问题描述】:

我正在开发一个共享的 Apache Zeppelin 服务器。几乎每天,我都尝试运行一个命令并得到这个错误:Job 65 cancelled because SparkContext was shut down

我很想详细了解导致 SparkContext 关闭的原因。我的理解是 Zeppelin 是一个 kube 应用程序,可以将命令发送到机器进行处理。

当 SparkContext 关闭时,这是否意味着我与 Spark 集群的桥已关闭?而且,如果是这样的话,我怎样才能让通往火花集群的桥梁倒塌?

在此示例中,它发生在我尝试将数据上传到 S3 时。

这是代码

val myfiles = readParquet(
    startDate=ew LocalDate(2020, 4, 1),
    endDate=ew LocalDate(2020, 4, 7)
)

log_events.createOrReplaceTempView("log_events")

val mysqlDF = spark.sql(s"""
    select [6 columns]
    from myfiles 
    join [other table]
    on [join_condition]
"""
)

mySQLDF.write.option("maxRecordsPerFile", 1000000).parquet(path)
// mySQLDF has 3M rows and they're all strings or dates

这是堆栈跟踪错误

org.apache.spark.SparkException: Job aborted.
  at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:198)
  at org.apache.spark.sql.execution.datasources.InsertIntoHadoopFsRelationCommand.run(InsertIntoHadoopFsRelationCommand.scala:159)
  at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult$lzycompute(commands.scala:104)
  at org.apache.spark.sql.execution.command.DataWritingCommandExec.sideEffectResult(commands.scala:102)
  at org.apache.spark.sql.execution.command.DataWritingCommandExec.doExecute(commands.scala:122)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:131)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$1.apply(SparkPlan.scala:127)
  at org.apache.spark.sql.execution.SparkPlan$$anonfun$executeQuery$1.apply(SparkPlan.scala:156)
  at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
  at org.apache.spark.sql.execution.SparkPlan.executeQuery(SparkPlan.scala:152)
  at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:127)
  at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:80)
  at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:80)
  at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
  at org.apache.spark.sql.DataFrameWriter$$anonfun$runCommand$1.apply(DataFrameWriter.scala:676)
  at org.apache.spark.sql.execution.SQLExecution$$anonfun$withNewExecutionId$1.apply(SQLExecution.scala:78)
  at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:125)
  at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:73)
  at org.apache.spark.sql.DataFrameWriter.runCommand(DataFrameWriter.scala:676)
  at org.apache.spark.sql.DataFrameWriter.saveToV1Source(DataFrameWriter.scala:285)
  at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:271)
  at org.apache.spark.sql.DataFrameWriter.save(DataFrameWriter.scala:229)
  at org.apache.spark.sql.DataFrameWriter.parquet(DataFrameWriter.scala:566)
  ... 48 elided
Caused by: org.apache.spark.SparkException: Job 44 cancelled because SparkContext was shut down
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:972)
  at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:970)
  at scala.collection.mutable.HashSet.foreach(HashSet.scala:78)
  at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:970)
  at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:2286)
  at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84)
  at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:2193)
  at org.apache.spark.SparkContext$$anonfun$stop$6.apply$mcV$sp(SparkContext.scala:1949)
  at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1340)
  at org.apache.spark.SparkContext.stop(SparkContext.scala:1948)
  at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend$MonitorThread.run(YarnClientSchedulerBackend.scala:121)
  at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:777)
  at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
  at org.apache.spark.sql.execution.datasources.FileFormatWriter$.write(FileFormatWriter.scala:167)
  ... 70 more

【问题讨论】:

这能回答你的问题吗? Job cancelled because SparkContext was shut down 不,因为我在使用 spark 和 hadoop 你能发布完整的堆栈跟踪吗? &如果可能的话,你正在执行什么代码?主要是因为一些内存问题 太好了,我发布了堆栈跟踪和代码。我举了一个例子,但这发生在许多不同的代码块上 【参考方案1】:

您的作业在写入步骤中被中止。 Job aborted. 是异常消息,它导致 Spark 上下文被关闭。

研究优化写入步骤,maxRecordsPerFile 可能是罪魁祸首;也许尝试一个较小的数字..您目前在一个文件中有 1M 条记录!


一般来说,Job $job.jobId cancelled because SparkContext was shut down 只是表示这是一个异常,因此 DAG 无法继续并需要出错。它是Spark scheduler throwing this error,当它遇到异常时,它可能是您的代码中未处理的异常或由于任何其他原因导致的作业失败。并且随着 DAG 调度程序停止,整个应用程序将停止(此消息是 Cleanup 的一部分)。


你的问题-

当 SparkContext 关闭时,是否意味着我与 Spark 集群的桥已关闭?

SparkContext 表示与 Spark 集群的连接,因此如果它已死,则意味着您无法在其上运行运行作业,因为您失去了链接!在 Zepplin 上,您只需重新启动 SparkContext(菜单 -> 解释器 -> Spark 解释器 -> 重新启动)

而且,如果是这样的话,我怎样才能让通往火花集群的桥梁发生故障?

在 Jobs 中出现 SparkException/Error 或手动使用 sc.stop()

【讨论】:

谢谢,如何重启 SparkContext? 在 Zepplin 上转到 Menu -> Interpreter -> Spark Interpreter -> Click restart(或者还有一个 hack mentioned here)

以上是关于作业 65 取消,因为 SparkContext 已关闭的主要内容,如果未能解决你的问题,请参考以下文章

Collect failed in ... s 由于 Stage 由于 SparkContext 已关闭而取消

Spark源码剖析——SparkContext

Spark源码剖析——SparkContext

Spark源码剖析——SparkContext

Zeppelin:如何在 zeppelin 中重新启动 sparkContext

Spark 报错解决--Error initializing SparkContext