通过监听器获取 Spark thrift 服务器查询中读取的行数
Posted
技术标签:
【中文标题】通过监听器获取 Spark thrift 服务器查询中读取的行数【英文标题】:Get number of rows read in Spark thrift server query through Listener 【发布时间】:2021-09-03 13:00:16 【问题描述】:我正在尝试为我们的 ST 服务器构建一个监控系统。到目前为止,诸如记录查询、检索的行/红色和花费的时间之类的东西都可以。
我已经实现了一个自定义监听器,我能够毫无问题地检索查询和时间,监听 SparkListenerSQLExecutionStart
和 SparkListenerSQLExecutionEnd
类似这样的:
//Structure to hold executionId, query itself from the description and startTime
val queries = new HashMap[Long, (String, Long)]
def onExecutionStart(event: SparkListenerSQLExecutionStart): Unit =
logger.info(s"----------executionId: $event.executionId")
logger.info(s"----------description: $event.description")
logger.info(s"----------startTime: $event.time")
logger.info(s"----------metrics")
queries.put(event.executionId, (event.description, event.time))
def onExecutionEnd(event: SparkListenerSQLExecutionEnd): Unit =
logger.info("-----onExecutionEnd")
logger.info(s"---------executionId: $event.executionId")
logger.info(s"---------endTime: $event.time")
val executedPlanMetricsMap = queryExecution.executedPlan.metrics
printMetrics(executedPlanMetricsMap, "executedPlan")
//Help method to print metrics map
def printMetrics[A] (metricsMap: Map[String, A], metricType:String): Unit =
try
logger.info(s"---------metrics from $metricType with size: $metricsMap.size")
metricsMap.foreach case (k, v) => logger.info(s"---------------metric from $metricType " +
s"key: $k, value: $v")
catch
case e: Exception => logger.info(s"---------empty $metricType")
到目前为止,我只是将它们打印在日志上,只是为了检查是否可以检索我需要的值。这一个示例输出
INFO EventMonitorCustomListener: ---------executionId: 16
INFO EventMonitorCustomListener: ---------endTime: 1630665171840
INFO EventMonitorCustomListener: ---------query: select *
from <myDatabase>.<myTable>
INFO EventMonitorCustomListener: ---------metrics from executedPlan with size: 6
INFO EventMonitorCustomListener: ---------------metric from executedPlan key: numFiles, value: SQLMetric(id: 82, name: Some(number of files read), value: 1)
INFO EventMonitorCustomListener: ---------------metric from executedPlan key: pruningTime, value: SQLMetric(id: 85, name: Some(dynamic partition pruning time), value: -1)
INFO EventMonitorCustomListener: ---------------metric from executedPlan key: metadataTime, value: SQLMetric(id: 83, name: Some(metadata time), value: 121)
INFO EventMonitorCustomListener: ---------------metric from executedPlan key: filesSize, value: SQLMetric(id: 84, name: Some(size of files read), value: 36148)
INFO EventMonitorCustomListener: ---------------metric from executedPlan key: numOutputRows, value: SQLMetric(id: 81, name: Some(number of output rows), value: 0)
INFO EventMonitorCustomListener: ---------------metric from executedPlan key: numPartitions, value: SQLMetric(id: 86, name: Some(number of partitions read), value: 1)
如您所见,我得到了查询,即从开始时间到结束时间所花费的总时间。但是当我检查执行计划中的指标时,值并不一致。 numOutputRows 应该是数字时为 0(特别是在我的示例中为 21)我也尝试使用 queryExecution.executedPlan.collectLeaves().head.metrics
,因为我发现 here 具有相同的结果
第一个问题:是否可以从SparkListenerSQLExecutionEnd
事件中检索输出行数?
如果没有,我可以使用键“internal.metrics.input.recordsRead”从累加器stageCompleted.stageInfo.accumulables
的SparkListenerStageCompleted
事件中检索它们。但在这种情况下,我无法将阶段或作业链接到 SQL 执行。即我得到了 jobid=4 和 stageId=5, executionId=12 但在任何情况下都没有任何价值可以将它们链接到另一个。
第二个问题:你知道一种方法可以知道执行属于哪个阶段或作业吗?
亲切的问候
编辑:
我找到了一种通过 jobStart.executionId 将 executionId 与 JobId 链接起来的方法。它在 docker 的 STS 中运行良好,但在我真正的 STS 中却不行。可能和 STS 配置有关?
【问题讨论】:
【参考方案1】:我们终于找到了为什么我的 jobStart 在属性中没有 executionId。这是因为增量收集设置为真。设置为 false 可以解决问题。
【讨论】:
以上是关于通过监听器获取 Spark thrift 服务器查询中读取的行数的主要内容,如果未能解决你的问题,请参考以下文章