Spark 流式传输作业在被驱动程序停止后失败
Posted
技术标签:
【中文标题】Spark 流式传输作业在被驱动程序停止后失败【英文标题】:Spark streaming job fails after getting stopped by Driver 【发布时间】:2016-05-09 12:54:46 【问题描述】:我有一个 spark 流作业,它从 Kafka 读取数据并对其进行一些操作。我在纱线集群 Spark 1.4.1 上运行该作业,该集群有两个节点,每个节点有 16 GB RAM,每个节点有 16 个内核。
我已将这些 conf 传递给 spark-submit 作业:
--master yarn-cluster --num-executors 3 --driver-memory 4g --executor-memory 2g --executor-cores 3
作业返回此错误并在运行一小会后完成:
INFO yarn.ApplicationMaster: Final app status: FAILED, exitCode: 11,
(reason: Max number of executor failures reached)
.....
ERROR scheduler.ReceiverTracker: Deregistered receiver for stream 0:
Stopped by driver
更新:
这些日志也被发现了:
INFO yarn.YarnAllocator: Received 3 containers from YARN, launching executors on 3 of them.....
INFO yarn.ApplicationMaster$AMEndpoint: Driver terminated or disconnected! Shutting down.
....
INFO yarn.YarnAllocator: Received 2 containers from YARN, launching executors on 2 of them.
INFO yarn.ExecutorRunnable: Starting Executor Container.....
INFO yarn.ApplicationMaster$AMEndpoint: Driver terminated or disconnected! Shutting down...
INFO yarn.YarnAllocator: Completed container container_e10_1453801197604_0104_01_000006 (state: COMPLETE, exit status: 1)
INFO yarn.YarnAllocator: Container marked as failed: container_e10_1453801197604_0104_01_000006. Exit status: 1. Diagnostics: Exception from container-launch.
Container id: container_e10_1453801197604_0104_01_000006
Exit code: 1
Stack trace: ExitCodeException exitCode=1:
at org.apache.hadoop.util.Shell.runCommand(Shell.java:576)
at org.apache.hadoop.util.Shell.run(Shell.java:487)
at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:753)
at org.apache.hadoop.yarn.server.nodemanager.DefaultContainerExecutor.launchContainer(DefaultContainerExecutor.java:211)
at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:302)
at org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch.call(ContainerLaunch.java:82)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
Container exited with a non-zero exit code 1
这可能是什么原因?感谢您的帮助。
谢谢
【问题讨论】:
在此之前您可能还有其他一些错误/信息,听起来像 killing executor, lost executor 。您可以在日志中查找并发布这些错误消息吗? @RaduIonescu 我添加了一些对我来说看起来很投机的日志。你能看看吗。谢谢。 在我看来,您要么调用sparkContext.stop()
,要么在驱动程序中使用了太多内存(例如,在整个 _RDD_s 上调用 collect()
)。您可以尝试运行它,显式需要更多资源或使用小数据集来确认这一点。
我都试过了。即使数据集很小,它也会发生
你在使用 YARN 日志聚合吗?将 yarn.log-aggregation-enable 设置为 true。
【参考方案1】:
我遇到了同样的问题。我找到了一种解决方案来解决这个问题,方法是在main
函数的末尾删除sparkContext.stop()
,将stop
操作留给GC。
Spark 团队已在 Spark 核心中解决了该问题,但到目前为止,该修复只是 master 分支。我们需要等到修复程序更新到新版本中。
https://issues.apache.org/jira/browse/SPARK-12009
【讨论】:
顺便说一句,我没有在我的代码中调用 sparkContext.stop()。【参考方案2】:能否请您展示您从 kafka 读取的 scala/java 代码?我怀疑您可能没有正确创建 SparkConf。
试试类似的东西
SparkConf sparkConf = new SparkConf().setAppName("ApplicationName");
还可以尝试在 yarn-client 模式下运行应用程序并共享输出。
【讨论】:
val conf = new SparkConf().setAppName("AppName")
这就是我在 scala 代码中创建 spark conf 的方式。我现在在一个独立的集群中运行,因为这个错误让我很头疼。所以不能在yarn-client模式下尝试
只是为了深入了解您的问题,您可以在 yarn-client 模式下运行 spark-shell。
像 spark-shell --master yarn-client 一样运行 spark shell 执行示例操作并查看 val textFile = sc.textFile("file on hdfs") textFile.count()以上是关于Spark 流式传输作业在被驱动程序停止后失败的主要内容,如果未能解决你的问题,请参考以下文章
如何在不中断流式传输作业的情况下更改 spark spark 流式事件中的 json 架构?