如何使用 Python 在 Spark Structured Streaming 中查看特定指标

Posted

技术标签:

【中文标题】如何使用 Python 在 Spark Structured Streaming 中查看特定指标【英文标题】:How to see a particular metric in Spark Structured Streaming with Python 【发布时间】:2022-01-20 23:41:04 【问题描述】:

我对 Spark 和 Python 非常陌生。我正在尝试查看 Spark Structured Streaming 中的任何指标(例如,processedRowsPerSecond),但我不知道该怎么做。

我在“Structured Streaming Programming Guide”中读到,使用 print(query.lastProgress) 您可以直接获取活动查询的当前状态和指标,但如果我编写它,我只能获取 @987654323 @ 一次。我的代码的最后一部分如下:

query = windowedCountsDF\
    .writeStream\
    .outputMode('update')\
    .option("truncate", "false") \
    .format('console') \
    .queryName("numbers") \
    .start()

print(query.lastProgress)

query.awaitTermination()

任何关于如何做到这一点的想法都将受到高度赞赏。

【问题讨论】:

【参考方案1】:

尝试:

while query.isActive:
    print("\n")
    print(query.status)
    print(query.lastProgress)
    time.sleep(30)

query.awaitTermination() 阻止 query.lastProgress

【讨论】:

正如目前所写,您的答案尚不清楚。请edit 添加其他详细信息,以帮助其他人了解这如何解决所提出的问题。你可以找到更多关于如何写好答案的信息in the help center。 谢谢亚历克斯。只有一个问题:如果我想访问 lastProgress 中的特定指标(我认为 lastProgress 是一本字典),我必须做什么?【参考方案2】:

这实际上取决于您想对该指标做什么。您的问题是您正在调用query.awaitTermination(),它会阻止任何其他活动。如果您想收集指标,那么您需要实现自己的等待循环,而不是调用query.awaitTermination(),如下所示:

query = ...

while not query.exception():
  if query.lastProgress:
    print(query.lastProgress) # do something with your data
  time.sleep(10) # wait 10 seconds..

【讨论】:

谢谢亚历克斯。我已经实现了您的代码,但收到以下错误消息:文件“numbers.py”,第 42 行,在 recentProgress = query.recentProgress() TypeError: 'list' object is not callable 21/12/ 18 14:07:32 ERROR streaming.MicroBatchExecution:查询编号 [id = ee2bc067-90c4-4d91-9c5b-dc00388f68b4,runId = 619509e1-4e81-4a78-a553-d75d6c0521a7] 以错误 java.lang.IllegalStateException 终止:无法调用方法在停止的 SparkContext 上。 对不起,这是代码中的错误,只是从文档中复制了一段,忘记了它是类的成员,而不是函数。最好使用lastProgress函数。 对不起,Alex,但它仍然不起作用。我收到此错误消息:21/12/18 15:56:07 ERROR streaming.MicroBatchExecution: Query numbers [id = 0edfdfa9-5cb0-4373-99b7-1a931d6883e0, runId = 83002c13-1260-4317-8043-a025820a514f] 终止于错误 java.lang.IllegalStateException:无法在停止的 SparkContext 上调用方法。 ***.com/questions/36884845/…

以上是关于如何使用 Python 在 Spark Structured Streaming 中查看特定指标的主要内容,如果未能解决你的问题,请参考以下文章

使用python在spark中加载pcap文件

Apache Spark:如何在Python 3中使用pyspark

如何使用 python 在 Spark 中转置 DataFrame 而不进行聚合

如何使用python将Spark数据写入ElasticSearch

如何使用 scala/python/spark 在 CSV 文件中创建新的时间戳(分钟)列

如何在 python 中使用 Spark Data frame 和 GroupBy 派生 Percentile