如何使用 Python 在 Spark Structured Streaming 中查看特定指标
Posted
技术标签:
【中文标题】如何使用 Python 在 Spark Structured Streaming 中查看特定指标【英文标题】:How to see a particular metric in Spark Structured Streaming with Python 【发布时间】:2022-01-20 23:41:04 【问题描述】:我对 Spark 和 Python 非常陌生。我正在尝试查看 Spark Structured Streaming 中的任何指标(例如,processedRowsPerSecond
),但我不知道该怎么做。
我在“Structured Streaming Programming Guide”中读到,使用 print(query.lastProgress)
您可以直接获取活动查询的当前状态和指标,但如果我编写它,我只能获取 @987654323 @ 一次。我的代码的最后一部分如下:
query = windowedCountsDF\
.writeStream\
.outputMode('update')\
.option("truncate", "false") \
.format('console') \
.queryName("numbers") \
.start()
print(query.lastProgress)
query.awaitTermination()
任何关于如何做到这一点的想法都将受到高度赞赏。
【问题讨论】:
【参考方案1】:尝试:
while query.isActive:
print("\n")
print(query.status)
print(query.lastProgress)
time.sleep(30)
query.awaitTermination()
阻止 query.lastProgress
。
【讨论】:
正如目前所写,您的答案尚不清楚。请edit 添加其他详细信息,以帮助其他人了解这如何解决所提出的问题。你可以找到更多关于如何写好答案的信息in the help center。 谢谢亚历克斯。只有一个问题:如果我想访问 lastProgress 中的特定指标(我认为 lastProgress 是一本字典),我必须做什么?【参考方案2】:这实际上取决于您想对该指标做什么。您的问题是您正在调用query.awaitTermination()
,它会阻止任何其他活动。如果您想收集指标,那么您需要实现自己的等待循环,而不是调用query.awaitTermination()
,如下所示:
query = ...
while not query.exception():
if query.lastProgress:
print(query.lastProgress) # do something with your data
time.sleep(10) # wait 10 seconds..
【讨论】:
谢谢亚历克斯。我已经实现了您的代码,但收到以下错误消息:文件“numbers.py”,第 42 行,在lastProgress
函数。
对不起,Alex,但它仍然不起作用。我收到此错误消息:21/12/18 15:56:07 ERROR streaming.MicroBatchExecution: Query numbers [id = 0edfdfa9-5cb0-4373-99b7-1a931d6883e0, runId = 83002c13-1260-4317-8043-a025820a514f] 终止于错误 java.lang.IllegalStateException:无法在停止的 SparkContext 上调用方法。
***.com/questions/36884845/…以上是关于如何使用 Python 在 Spark Structured Streaming 中查看特定指标的主要内容,如果未能解决你的问题,请参考以下文章
Apache Spark:如何在Python 3中使用pyspark
如何使用 python 在 Spark 中转置 DataFrame 而不进行聚合
如何使用python将Spark数据写入ElasticSearch