如何检查结构化流中的 StreamingQuery 性能指标?
Posted
技术标签:
【中文标题】如何检查结构化流中的 StreamingQuery 性能指标?【英文标题】:How to check StreamingQuery performance metrics in Structured Streaming? 【发布时间】:2020-12-12 12:49:07 【问题描述】:我想从流式查询中获取triggerExecution, inputRowsPerSecond, numInputRows, processedRowsPerSecond
之类的信息。
我使用rate
格式生成10 rows per second
,并使用QueryProgressEvent
获取所有指标。
但是,在控制台中,在打印 QueryProgressEvent.inputRowsPerSecond
时,我得到了不正确的值,例如:625.0
666.66
有人可以解释为什么它会产生这样的价值吗?
代码和示例输出如下:
spark.streams.addListener(new EventMetric())
val df = spark.readStream
.format("rate")
.option("rowsPerSecond",10)
.option("numPartitions",1)
.load()
.select($"value",$"timestamp")
df.writeStream
.outputMode("append")
.option("checkpointLocation", "/testjob")
.foreachBatch((batchDf: DataFrame, batchId: Long) =>
println("rowcount value >>>> " + rowCountAcc.value)
val outputDf = batchDf
outputDf.write
.format("console")
.mode("append")
.save()
)
.start()
.awaitTermination()
StreamingQueryListener:
class EventMetric extends StreamingQueryListener
override def onQueryStarted(event: QueryStartedEvent): Unit =
override def onQueryProgress(event: QueryProgressEvent): Unit =
val p = event.progress
// println("id : " + p.id)
println("runId : " + p.runId)
// println("name : " + p.name)
println("batchid : " + p.batchId)
println("timestamp : " + p.timestamp)
println("triggerExecution" + p.durationMs.get("triggerExecution"))
println(p.eventTime)
println("inputRowsPerSecond : " + p.inputRowsPerSecond)
println("numInputRows : " + p.numInputRows)
println("processedRowsPerSecond : " + p.processedRowsPerSecond)
println(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>")
override def onQueryTerminated(event: QueryTerminatedEvent): Unit =
输出 1:
runId : bc7f97c1-687f-4125-806a-dc573e006dcd
batchid : 164
timestamp : 2020-12-12T12:31:14.323Z
triggerExecution453
inputRowsPerSecond : 625.0
numInputRows : 10
processedRowsPerSecond : 22.075055187637968
输出 2:
runId : bc7f97c1-687f-4125-806a-dc573e006dcd
batchid : 168
timestamp : 2020-12-12T12:31:18.326Z
triggerExecution453
inputRowsPerSecond : 666.6666666666667
numInputRows : 10
processedRowsPerSecond : 22.075055187637968
编辑:
另外,如果输入速率是 625,那么对于这个实际上没有进行转换的作业,为什么处理的RowsPerSecond 如此之低?
更新 :: 输出漂亮的 JSON:
第 1 批:
runId : 16c82066-dea0-4e0d-8a1e-ad1df55ad516
batchid : 198
timestamp : 2020-12-13T16:23:14.331Z
triggerExecution422
inputRowsPerSecond : 666.6666666666667
numInputRows : 10
processedRowsPerSecond : 23.696682464454977
json :
"id" : "f8af5400-533c-4f7f-8b01-b365dc736735",
"runId" : "16c82066-dea0-4e0d-8a1e-ad1df55ad516",
"name" : null,
"timestamp" : "2020-12-13T16:23:14.331Z",
"batchId" : 198,
"numInputRows" : 10,
"inputRowsPerSecond" : 666.6666666666667,
"processedRowsPerSecond" : 23.696682464454977,
"durationMs" :
"addBatch" : 47,
"getBatch" : 0,
"getEndOffset" : 0,
"queryPlanning" : 0,
"setOffsetRange" : 0,
"triggerExecution" : 422,
"walCommit" : 234
,
"stateOperators" : [ ],
"sources" : [
"description" : "RateStreamV2[rowsPerSecond=10, rampUpTimeSeconds=0, numPartitions=1",
"startOffset" : 212599,
"endOffset" : 212600,
"numInputRows" : 10,
"inputRowsPerSecond" : 666.6666666666667,
"processedRowsPerSecond" : 23.696682464454977
],
"sink" :
"description" : "ForeachBatchSink"
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
第 2 批:
runId : 16c82066-dea0-4e0d-8a1e-ad1df55ad516
batchid : 191
timestamp : 2020-12-13T16:23:07.328Z
triggerExecution421
inputRowsPerSecond : 625.0
numInputRows : 10
processedRowsPerSecond : 23.752969121140143
json :
"id" : "f8af5400-533c-4f7f-8b01-b365dc736735",
"runId" : "16c82066-dea0-4e0d-8a1e-ad1df55ad516",
"name" : null,
"timestamp" : "2020-12-13T16:23:07.328Z",
"batchId" : 191,
"numInputRows" : 10,
"inputRowsPerSecond" : 625.0,
"processedRowsPerSecond" : 23.752969121140143,
"durationMs" :
"addBatch" : 62,
"getBatch" : 0,
"getEndOffset" : 0,
"queryPlanning" : 16,
"setOffsetRange" : 0,
"triggerExecution" : 421,
"walCommit" : 187
,
"stateOperators" : [ ],
"sources" : [
"description" : "RateStreamV2[rowsPerSecond=10, rampUpTimeSeconds=0, numPartitions=1",
"startOffset" : 212592,
"endOffset" : 212593,
"numInputRows" : 10,
"inputRowsPerSecond" : 625.0,
"processedRowsPerSecond" : 23.752969121140143
],
"sink" :
"description" : "ForeachBatchSink"
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>
【问题讨论】:
【参考方案1】:请记住,每秒生成 10 行并不能说明整个流式查询中的输入速率。
在您的writeStream
调用中,您没有设置Trigger
,这意味着流式查询在完成并且新数据可用时被触发。
现在,流式查询显然不需要整秒来读取这 10 秒,而是其中的一小部分。 “inputRowsPerSecond”更多的是衡量读取输入数据的速度。由于舍入问题,您也可以在不同批次中看到不同的值。检查输出中的“时间戳”字段,它不完全是一秒,但通常是 +- 几毫秒。
作业只需几毫秒即可读取数据,而且批次之间可能略有不同。在第 164 批次中,作业花费了 16 毫秒,在第 168 批次中,读取 10 条消息花费了 15 毫秒。
Batch 164 => 10 / 0,016sec = 625 messages per second
Batch 168 => 10 / 0,015ses = 666.6667 messages per second
processedRowsPerSecond
是根据triggerExecution
计算得出的
1000 / triggerExecution x 10msg = 1000 / 421 x 10msg = 23.752969
【讨论】:
非常感谢您的解释。您能否解释一下我在上面编辑的查询? 我需要查看其他基于时间的指标。但我假设这个指标更多地是对实际吞吐量的衡量。由于只生成了 10 条消息,因此这个数字非常低。我不知道要计算这个值的精确分母是什么。 进展。 eventTime 地图打印为空。您可以指定哪些其他基于时间的指标?我会得到那个输出 如果你打印出类似p.json.pretty
(可能没有漂亮)或event.json
。
嗨。我已经用 JSON 编辑了这篇文章。请检查。以上是关于如何检查结构化流中的 StreamingQuery 性能指标?的主要内容,如果未能解决你的问题,请参考以下文章