pyspark 结构化流不使用 query.lastProgress 或其他标准指标更新查询指标

Posted

技术标签:

【中文标题】pyspark 结构化流不使用 query.lastProgress 或其他标准指标更新查询指标【英文标题】:pyspark structured streaming not updating query metrics with query.lastProgress or other standard metrics 【发布时间】:2020-11-18 01:41:15 【问题描述】:

我正在尝试将日志记录添加到我的 pyspark 结构化流应用程序中,以便查看每个处理的微批处理的进度和统计信息。 writestream 方法使用 foreach 编写器将数据帧中的行写入 postgres 数据库。我正在使用.lastProgress 和 pyspark 提供的其他标准指标进行日志记录。 writestream 方法和我的日志尝试如下所示。

query_1 = eventsDF \
    .writeStream \
    .foreach(writer) \
    .outputMode("append") \
    .option("checkpointLocation", "/tmp/checkpoint_a/") \
    .trigger(processingTime="5 seconds") \
    .start()


query_progress =  query_1.lastProgress
print("progress ", query_progress)
print("status ", query_1.status)
print("active ", query_1.isActive)

query_1.awaitTermination()

我的第一个循环的结果是:

progress  None
status  'message': 'Initializing sources', 'isDataAvailable': False, 'isTriggerActive': False
active  True

但是,随着事件数据的到达,进一步处理的批处理不会产生更多的日志消息。我希望在流式作业中处理每个微批处理后都会发出日志消息。

感谢任何建议或指导。谢谢。

【问题讨论】:

【参考方案1】:

startawaitTermination 之间的所有代码只执行一次。只有loadstart 之间的代码会在每个查询触发器上连续执行。

根据“Spark - 权威指南”一书,这种监控方式旨在在您的应用程序内部运行。但是,对于独立应用程序,您通常没有附加外壳来运行任意代码。在书中,他们建议“通过实现一个监控服务器来公开 [查询] 状态,例如一个小型 HTTP 服务器,它监听端口并在收到请求时返回 query.status

因此,您需要创建一个专用的可运行线程,该线程会频繁调用查询的监控 API。我真的不熟悉 Python,但它基本上如下所示:

# import the threading module 
import threading  
  
class thread(threading.Thread):  
    def __init__(self, query):  
        threading.Thread.__init__(self)  
        self.query = query  
  
        # helper function to execute the threads 
    def run(self):  
        print("progress ", query.lastProgress);  

完成此操作后,您需要将其放在startawaitTermination 之间:

query_1 = eventsDF \
    [...]
    .start()

monitoring = thread(query_1)

query_1.awaitTermination()

您也可以使用while(query_1.isActive) 循环查询的状态,而不是专用线程。

对于 Scala 用户:

How to get progress of streaming query after awaitTermination?

【讨论】:

以上是关于pyspark 结构化流不使用 query.lastProgress 或其他标准指标更新查询指标的主要内容,如果未能解决你的问题,请参考以下文章

使用 pyspark 处理结构数据类型

如何在 pyspark 结构化流中使用 maxOffsetsPerTrigger?

在结构化流 API (pyspark) 中使用 redshift 作为 readStream 的 JDBC 源

设置套接字流不起作用

Pyspark 结构化流处理

对多租户应用程序使用授权码流不起作用