通过Spark Rest 服务监控Spark任务执行情况

Posted 斯维达夏

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了通过Spark Rest 服务监控Spark任务执行情况相关的知识,希望对你有一定的参考价值。

1、Rest服务

  Spark源为了方便用户对任务做监控,从1.4版本启用Rest服务,用户可以通过访问地址,得到application的运行状态。

  Spark的REST API返回的信息是JSON格式的,开发者们可以很方便地通过这个API来创建可视化的Spark监控工具。目前

  这个API支持正在运行的应用程序,也支持历史服务器。在请求URL都有/api/v1。比如,对于历史服务器来说,我们可以通过
  http://***:18080/api/v1 来获取一些信息,端口可以改;对于正在运行的Spark应用程序,我们可以通过 https://***/api/v1 
  来获取一些信息。
 
  主要用途: 通过rest服务,可以轻松对任务时长、stage等做监控,同时可以配合时间序列数据库,对集群各个任务做监控。

2、实例代码(Python)

  

 1 #!/usr/bin/env python
 2 # -*- coding: utf-8 -*-
 3 ‘‘‘
 4     Created by zhangy on Aug 25, 2017
 5 ‘‘‘
 6 import datetime
 7 import json, urllib2
 8 import os
 9 import time  
10 
11
12 if __name__ == __main__:
13     command = "yarn application -list |grep Noce |awk -F‘\t‘ ‘{print $1}‘"
14     val = os.popen(command).read()
15     appids = val.split("\n")
16     for pid in appids:
17         if pid.__eq__(""):continue
18         url = "http://th04-znwg-sgi620-001:18088/api/v1/applications/" + pid
19         req = urllib2.Request(url)
20         res_data = urllib2.urlopen(req)
21         res = res_data.read()
22         jo = json.loads(res)
23         dict1 = jo[attempts][0]
24         st = dict1[startTime]
25         GMT_FORMAT = %Y-%m-%dT%H:%M:%S.%fGMT
26         sti = datetime.datetime.strptime(st, GMT_FORMAT)
27         startTime = time.mktime(sti.timetuple()) + 8 * 60 * 60
28         nowTime = long(time.time())
29         sub = nowTime - startTime
30         if sub > 4 * 60 * 60:
31             killCommand = "yarn application -kill " + pid
32             res = os.popen(command).read()
33             cc = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(float(nowTime)))
34             f = open("/home/noce1/run_noce/his/monitor/" + pid + ".txt", "a")
35             f.write(cc + " : " + "pid : " + "\n" + sub + " seconds")
36             f.write(res + "\n")
37             f.close()
38         
39         
40         

     测试实例中,只是对spark任务的时间做了监控,如果任务超过理想执行时长(4个小时),则终止任务,释放资源。

 结果:

 1 例如:
 2 http://132.12*****:18088/api/v1/applications/application_1502706935975_233268/
 3 
 4 返回内容:json格式
 5 {
 6   "id" : "application_1502706935975_233268",
 7   "name" : "FRT3_73",
 8   "attempts" : [ {
 9     "startTime" : "2017-09-04T01:29:53.986GMT",
10     "endTime" : "2017-09-04T01:31:52.955GMT",
11     "sparkUser" : "noce1",
12     "completed" : true
13   } ]
14 }

 

3、官方其他(2.1.0版本,http:**** :18080/api/v1/)

 

Endpoint Meaning
/applications A list of all applications.
?status=[completed|running] list only applications in the chosen state.
?minDate=[date] earliest start date/time to list.
?maxDate=[date] latest start date/time to list.
?minEndDate=[date] earliest end date/time to list.
?maxEndDate=[date] latest end date/time to list.
?limit=[limit] limits the number of applications listed.
Examples:
?minDate=2015-02-10
?minDate=2015-02-03T16:42:40.000GMT
?maxDate=2015-02-11T20:41:30.000GMT
?minEndDate=2015-02-12
?minEndDate=2015-02-12T09:15:10.000GMT
?maxEndDate=2015-02-14T16:30:45.000GMT
?limit=10
/applications/[app-id]/jobs A list of all jobs for a given application.
?status=[running|succeeded|failed|unknown] list only jobs in the specific state.
/applications/[app-id]/jobs/[job-id] Details for the given job.
/applications/[app-id]/stages A list of all stages for a given application.
/applications/[app-id]/stages/[stage-id] A list of all attempts for the given stage.
/applications/[app-id]/stages/[stage-id]/[stage-attempt-id] Details for the given stage attempt.
/applications/[app-id]/stages/[stage-id]/[stage-attempt-id]/taskSummary Summary metrics of all tasks in the given stage attempt.
?quantiles summarize the metrics with the given quantiles.
Example: ?quantiles=0.01,0.5,0.99
/applications/[app-id]/stages/[stage-id]/[stage-attempt-id]/taskList A list of all tasks for the given stage attempt.
?offset=[offset]&length=[len] list tasks in the given range.
?sortBy=[runtime|-runtime] sort the tasks.
Example: ?offset=10&length=50&sortBy=runtime
/applications/[app-id]/executors A list of all active executors for the given application.
/applications/[app-id]/allexecutors A list of all(active and dead) executors for the given application.
/applications/[app-id]/storage/rdd A list of stored RDDs for the given application.
/applications/[app-id]/storage/rdd/[rdd-id] Details for the storage status of a given RDD.
/applications/[base-app-id]/logs Download the event logs for all attempts of the given application as files within a zip file.
/applications/[base-app-id]/[attempt-id]/logs Download the event logs for a specific application attempt as a zip file.
/applications/[app-id]/streaming/statistics Statistics for the streaming context.
/applications/[app-id]/streaming/receivers A list of all streaming receivers.
/applications/[app-id]/streaming/receivers/[stream-id] Details of the given receiver.
/applications/[app-id]/streaming/batches A list of all retained batches.
/applications/[app-id]/streaming/batches/[batch-id] Details of the given batch.
/applications/[app-id]/streaming/batches/[batch-id]/operations A list of all output operations of the given batch.
/applications/[app-id]/streaming/batches/[batch-id]/operations/[outputOp-id] Details of the given operation and given batch.
/applications/[app-id]/environment Environment details of the given application.

 

以上是关于通过Spark Rest 服务监控Spark任务执行情况的主要内容,如果未能解决你的问题,请参考以下文章

spark监控调优

在 Spark 结构化流中,我如何将完整的聚合输出到外部源,如 REST 服务

Spark(14)——spark streaming 监控方案

Spark2.2(三十九):如何根据appName监控spark任务,当任务不存在则启动(任务存在当超过多久没有活动状态则kill,等待下次启动)

Java Web提交任务到Spark Standalone集群并监控

上传 Spark RDD 到 REST webservice POST 方法