如何检查结构化流中的 StreamingQuery 性能指标?

Posted

技术标签:

【中文标题】如何检查结构化流中的 StreamingQuery 性能指标?【英文标题】:How to check StreamingQuery performance metrics in Structured Streaming? 【发布时间】:2020-12-12 12:49:07 【问题描述】:

我想从流式查询中获取triggerExecution, inputRowsPerSecond, numInputRows, processedRowsPerSecond 之类的信息。

我使用rate 格式生成10 rows per second,并使用QueryProgressEvent 获取所有指标。

但是,在控制台中,在打印 QueryProgressEvent.inputRowsPerSecond 时,我得到了不正确的值,例如:625.0 666.66

有人可以解释为什么它会产生这样的价值吗?

代码和示例输出如下:

 spark.streams.addListener(new EventMetric())

val df = spark.readStream
.format("rate")
  .option("rowsPerSecond",10)
  .option("numPartitions",1)
  .load()
  .select($"value",$"timestamp")

df.writeStream
.outputMode("append")
.option("checkpointLocation", "/testjob")
.foreachBatch((batchDf: DataFrame, batchId: Long) =>
  println("rowcount value >>>> " + rowCountAcc.value)
  val outputDf = batchDf
  outputDf.write
    .format("console")
    .mode("append")
    .save()
)
.start()
.awaitTermination()

StreamingQueryListener:

class EventMetric extends StreamingQueryListener
  override def onQueryStarted(event: QueryStartedEvent): Unit = 
  

  override def onQueryProgress(event: QueryProgressEvent): Unit = 
    val p = event.progress
//    println("id : " + p.id)
    println("runId : "  + p.runId)
//    println("name : " + p.name)
    println("batchid : " + p.batchId)
    println("timestamp : " + p.timestamp)
    println("triggerExecution" + p.durationMs.get("triggerExecution"))
    println(p.eventTime)
    println("inputRowsPerSecond : " + p.inputRowsPerSecond)
    println("numInputRows : " + p.numInputRows)
    println("processedRowsPerSecond : " + p.processedRowsPerSecond)
    println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>")
  

  override def onQueryTerminated(event: QueryTerminatedEvent): Unit = 

  

输出 1:

runId : bc7f97c1-687f-4125-806a-dc573e006dcd
batchid : 164
timestamp : 2020-12-12T12:31:14.323Z
triggerExecution453

inputRowsPerSecond : 625.0
numInputRows : 10
processedRowsPerSecond : 22.075055187637968

输出 2:

runId : bc7f97c1-687f-4125-806a-dc573e006dcd
batchid : 168
timestamp : 2020-12-12T12:31:18.326Z
triggerExecution453

inputRowsPerSecond : 666.6666666666667
numInputRows : 10
processedRowsPerSecond : 22.075055187637968

编辑:

另外,如果输入速率是 625,那么对于这个实际上没有进行转换的作业,为什么处理的RowsPerSecond 如此之低?


更新 :: 输出漂亮的 JSON:

第 1 批:

runId : 16c82066-dea0-4e0d-8a1e-ad1df55ad516
batchid : 198
timestamp : 2020-12-13T16:23:14.331Z
triggerExecution422

inputRowsPerSecond : 666.6666666666667
numInputRows : 10
processedRowsPerSecond : 23.696682464454977
json : 
  "id" : "f8af5400-533c-4f7f-8b01-b365dc736735",
  "runId" : "16c82066-dea0-4e0d-8a1e-ad1df55ad516",
  "name" : null,
  "timestamp" : "2020-12-13T16:23:14.331Z",
  "batchId" : 198,
  "numInputRows" : 10,
  "inputRowsPerSecond" : 666.6666666666667,
  "processedRowsPerSecond" : 23.696682464454977,
  "durationMs" : 
    "addBatch" : 47,
    "getBatch" : 0,
    "getEndOffset" : 0,
    "queryPlanning" : 0,
    "setOffsetRange" : 0,
    "triggerExecution" : 422,
    "walCommit" : 234
  ,
  "stateOperators" : [ ],
  "sources" : [ 
    "description" : "RateStreamV2[rowsPerSecond=10, rampUpTimeSeconds=0, numPartitions=1",
    "startOffset" : 212599,
    "endOffset" : 212600,
    "numInputRows" : 10,
    "inputRowsPerSecond" : 666.6666666666667,
    "processedRowsPerSecond" : 23.696682464454977
   ],
  "sink" : 
    "description" : "ForeachBatchSink"
  

>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>

第 2 批:

runId : 16c82066-dea0-4e0d-8a1e-ad1df55ad516
batchid : 191
timestamp : 2020-12-13T16:23:07.328Z
triggerExecution421

inputRowsPerSecond : 625.0
numInputRows : 10
processedRowsPerSecond : 23.752969121140143
json : 
  "id" : "f8af5400-533c-4f7f-8b01-b365dc736735",
  "runId" : "16c82066-dea0-4e0d-8a1e-ad1df55ad516",
  "name" : null,
  "timestamp" : "2020-12-13T16:23:07.328Z",
  "batchId" : 191,
  "numInputRows" : 10,
  "inputRowsPerSecond" : 625.0,
  "processedRowsPerSecond" : 23.752969121140143,
  "durationMs" : 
    "addBatch" : 62,
    "getBatch" : 0,
    "getEndOffset" : 0,
    "queryPlanning" : 16,
    "setOffsetRange" : 0,
    "triggerExecution" : 421,
    "walCommit" : 187
  ,
  "stateOperators" : [ ],
  "sources" : [ 
    "description" : "RateStreamV2[rowsPerSecond=10, rampUpTimeSeconds=0, numPartitions=1",
    "startOffset" : 212592,
    "endOffset" : 212593,
    "numInputRows" : 10,
    "inputRowsPerSecond" : 625.0,
    "processedRowsPerSecond" : 23.752969121140143
   ],
  "sink" : 
    "description" : "ForeachBatchSink"
  

>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>

【问题讨论】:

【参考方案1】:

请记住,每秒生成 10 行并不能说明整个流式查询中的输入速率

在您的writeStream 调用中,您没有设置Trigger,这意味着流式查询在完成并且新数据可用时被触发。

现在,流式查询显然不需要整秒来读取这 10 秒,而是其中的一小部分。 “inputRowsPerSecond”更多的是衡量读取输入数据的速度。由于舍入问题,您也可以在不同批次中看到不同的值。检查输出中的“时间戳”字段,它不完全是一秒,但通常是 +- 几毫秒。

作业只需几毫秒即可读取数据,而且批次之间可能略有不同。在第 164 批次中,作业花费了 16 毫秒,在第 168 批次中,读取 10 条消息花费了 15 毫秒。

Batch 164 => 10 / 0,016sec = 625 messages per second

Batch 168 => 10 / 0,015ses = 666.6667 messages per second

processedRowsPerSecond 是根据triggerExecution 计算得出的

1000 / triggerExecution x 10msg = 1000 / 421 x 10msg = 23.752969 

【讨论】:

非常感谢您的解释。您能否解释一下我在上面编辑的查询? 我需要查看其他基于时间的指标。但我假设这个指标更多地是对实际吞吐量的衡量。由于只生成了 10 条消息,因此这个数字非常低。我不知道要计算这个值的精确分母是什么。 进展。 eventTime 地图打印为空。您可以指定哪些其他基于时间的指标?我会得到那个输出 如果你打印出类似p.json.pretty(可能没有漂亮)或event.json 嗨。我已经用 JSON 编辑了这篇文章。请检查。

以上是关于如何检查结构化流中的 StreamingQuery 性能指标?的主要内容,如果未能解决你的问题,请参考以下文章

C - 分隔输入流中的字符串

如何在火花结构化流式读取流中倒带 Kafka 偏移

如何从 Spark 结构化流中的 Cassandra 等外部存储读取 Kafka 和查询?

如何在 Spark 结构化流中保存通过水印丢弃的记录

如何在流中键入检查日期对象?

Azure 数据工厂:如何从数据流转换中的流中获取第一行