Java Web提交任务到Spark Standalone集群并监控

Posted fansy1990

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Java Web提交任务到Spark Standalone集群并监控相关的知识,希望对你有一定的参考价值。

Java Web提交任务到Spark Standalone集群并监控

1. 环境

软件版本备注
IDEA14.1.5
JDK1.8
Spark1.6.0工程maven引用
Sparkcdh5.7.3-spark1.6.0实际集群5.7.3-1.cdh5.7.3.p0.5
Hadoop2.6.4工程Maven引用
Hadoop2.6.0-cdh5.7.3实际集群参数
Maven3.3

2. 工程下载路径

工程在GitHub上地址为: javaweb_spark_standalone_monitor

3. Spark任务提交流程

之前做过相关的工作,知道可以通过下面的方式来提交任务到Spark Standalone集群:

String[] arg0=new String[]
                "--master","spark://server2.tipdm.com:6066",
                "--deploy-mode","cluster",
                "--name",appName,
                "--class",className,
                "--executor-memory","2G",
                "--total-executor-cores","10",
                "--executor-cores","2",
                path,
                "/user/root/a.txt",
                "/tmp/"+System.currentTimeMillis()
        ;
        SparkSubmit.main(arg0);

1. 这里要注意的是,这里使用的模式是cluster,而非client,也就是说driver程序也是运行在集群中的,而非提交的客户端,也就是我Win10本地。
2. 如果需要使用client提交,那么需要注意本地资源是否足够;同时因为这里使用的是cluster,所以需要确保集群资源同时可以运行一个driver以及executor(即,最少需要同时运行两个Container)
3. 其中的path,也就是打的jar包需要放到集群各个slave节点中的对应位置。比如lz集群中有node1,node2,node3 ,那么就需要把wc.jar放到这三个节点上,比如放到/tmp/wc.jar ,那么path的设置就要设置为file:/opt/wc.jar ,如果直接使用/opt/wc.jar 那么在进行参数解析的时候会被解析成file:/c:/opt/wc.jar (因为lz使用的是win10运行Tomcat),从而报jar包文件找不到的错误!

进入SparkSubmit.main源码,可以看到如下代码:

def main(args: Array[String]): Unit = 
    val appArgs = new SparkSubmitArguments(args)
    if (appArgs.verbose) 
      // scalastyle:off println
      printStream.println(appArgs)
      // scalastyle:on println
    
    appArgs.action match 
      case SparkSubmitAction.SUBMIT => submit(appArgs)
      case SparkSubmitAction.KILL => kill(appArgs)
      case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
    
  

代码里面是通过submit来提交任务的,顺着这条线往下,则最终是通过

mainMethod.invoke 是通过反射来调用的,通过debug可以得到,这里反射调用的其实是:RestSubmissionClient 的main函数提交任务的。
所以这里可以模仿RestSubmissionClient来提交任务。程序如下:

public static String submit(String appResource,String mainClass,String ...args)
        SparkConf sparkConf = new SparkConf();
        // 下面的是参考任务实时提交的Debug信息编写的
        sparkConf.setMaster(MASTER);
        sparkConf.setAppName(APPNAME+" "+ System.currentTimeMillis());
        sparkConf.set("spark.executor.cores","2");
        sparkConf.set("spark.submit.deployMode","cluster");
        sparkConf.set("spark.jars",appResource);
        sparkConf.set("spark.executor.memory","2G");
        sparkConf.set("spark.cores.max","2");
        sparkConf.set("spark.driver.supervise","false");
        Map<String,String> env = filterSystemEnvironment(System.getenv());
        CreateSubmissionResponse response = null;
        try 
            response = (CreateSubmissionResponse)
                    RestSubmissionClient.run(appResource, mainClass, args, sparkConf, toScalaMap(env));
        catch (Exception e)
            e.printStackTrace();
            return null;
        
        return response.submissionId();
    

如果不加其中的

sparkConf.set

则程序运行会有问题,第一个错误就是:

java.lang.IllegalArgumentException: Invalid environment variable name: “=::”

这个错误是因为模式设置不对(没有设置cluster模式),所以在进行参数匹配的时候异常。可以看到的参数如下图所示:

这里面对应的参数,其实就是SparkSubmit提交任务所对应的值了。

4. 问题及问题解决

问题提出:
1. 最近一段时间,在想运行Spark的任务的时候为什么要提交到YARN上,而且通过实践发现,提交到YARN上程序运行比Spark Standalone运行要慢的多,所以是否能直接提交任务到Spark Standalone集群呢?
2. 提交任务到Spark Standalone集群后,如何获得任务的id,方便后面的监控呢?
3. 获得任务id后,怎么监控?

针对这三个问题,解答如下:
1. 第一个问题,应该是见仁见智的问题了,使用SparkONYARN的方式可以统一生态圈什么的;
2. 在上面的代码中已经可以提交任务,并且获取任务ID了。不过需要注意的是,通过:

response = (CreateSubmissionResponse)
RestSubmissionClient.run(appResource, mainClass, args, sparkConf, toScalaMap(env));

获取的response需要转型为CreateSubmissionResponse,才能获得submittedId,但是要访问CreateSubmissionResponse,那么需要在某些包下面才行,所以lz的SparkEngine类才会定义在org.apache.spark.deploy.rest包中。

第三:
监控,监控就更简单了,可以参考:

private def requestStatus(args: SparkSubmitArguments): Unit = 
    new RestSubmissionClient(args.master)
      .requestSubmissionStatus(args.submissionToRequestStatusFor)
  

这里就是监控的代码了,lz参考这段代码写了个监控,详见GitHub

后记

在提交任务到Spark Standalone的时候,lz发现driver和实际的任务是分开的,如下:

发现是driver 调用app,本来想着,driver是不是提交后,就Over了,结果发现driver会一直监控app的状态,如果app运行成功结束,那么driver状态就会返回FINISHED,如果失败,则driver状态也是ERROR。所以可以直接监控driver来监控整个任务。

使用Spark Standalone来运行Spark程序,确实比Spark On YARN快的多了!


分享,成长,快乐

脚踏实地,专注

转载请注明blog地址:http://blog.csdn.net/fansy1990

以上是关于Java Web提交任务到Spark Standalone集群并监控的主要内容,如果未能解决你的问题,请参考以下文章

spark提交参数解析

Spark集群任务提交流程----2.1.0源码解析

如何在Java应用中提交Spark任务?

如何通过Java程序提交yarn的MapReduce计算任务

基于Java实现编程式提交Spark任务

基于Java实现编程式提交Spark任务