Spark:作业重启和重试

Posted

技术标签:

【中文标题】Spark:作业重启和重试【英文标题】:Spark: Job restart and retries 【发布时间】:2017-03-17 14:43:03 【问题描述】:

假设您有 Spark + Standalone 集群管理器。您使用一些配置打开了 spark 会话,并希望使用不同的参数并行启动 SomeSparkJob 40 次。

问题

    如何在作业失败时设置重试数量? 如何在失败时以编程方式重新启动作业?如果作业因资源不足而失败,这可能很有用。我可以一一启动所有需要额外资源的工作。 如何在作业失败时重新启动 spark 应用程序? 如果作业即使同时启动也缺乏资源,这可能很有用。比起更改内核、CPU 等配置,我需要在独立集群管理器中重新启动应用程序。

我的解决方法

1) 我很确定第一点是可能的,因为在spark local mode 是可能的。我只是不知道如何在独立模式下做到这一点。 2-3) 可以在诸如spark.sparkContext().addSparkListener(new SparkListener() 之类的 spark 上下文中传递侦听器。但似乎SparkListener 缺少失败回调。

还有很多方法的文档很差。我从未使用过它们,但也许它们可以帮助解决我的问题。

spark.sparkContext().dagScheduler().runJob();
spark.sparkContext().runJob()
spark.sparkContext().submitJob()
spark.sparkContext().taskScheduler().submitTasks();
spark.sparkContext().dagScheduler().handleJobCancellation();
spark.sparkContext().statusTracker()

【问题讨论】:

【参考方案1】:

您可以使用 SparkLauncher 并控制流程。

import org.apache.spark.launcher.SparkLauncher;

   public class MyLauncher 
     public static void main(String[] args) throws Exception 
       Process spark = new SparkLauncher()
         .setAppResource("/my/app.jar")
         .setMainClass("my.spark.app.Main")
         .setMaster("local")
         .setConf(SparkLauncher.DRIVER_MEMORY, "2g")
         .launch();
       spark.waitFor();
     
   

更多详情请见API。

由于它创建进程,您可以检查进程状态并重试,例如尝试以下:

public boolean isAlive()

如果进程没有重新启动,请参阅API了解更多详细信息。

希望这可以让我们深入了解我们如何实现您在问题中提到的内容。可能有更多方法可以做同样的事情,但考虑分享这种方法。

干杯!

【讨论】:

【参考方案2】:

检查您的 spark.sql.broadcastTimeout 和 spark.broadcast.blockSize 属性,尝试增加它们。

【讨论】:

以上是关于Spark:作业重启和重试的主要内容,如果未能解决你的问题,请参考以下文章

nginx页面访问超时和重试 参数 配置

2个问题,cron作业不运行重启,kill不优雅重启

Alamofire 5 调整和重试请求

spark基于Zookeeper的HA集群重启

SQLAlchemy,以惯用的 Python 方式进行可序列化事务隔离和重试

Alamofire/RxSwift 如何在状态码 401 上自动刷新令牌和重试请求