PySpark 结构化流式处理:将查询的输出传递到 API 端点

Posted

技术标签:

【中文标题】PySpark 结构化流式处理:将查询的输出传递到 API 端点【英文标题】:PySpark Structured Streaming: Pass output of Query to API endpoint 【发布时间】:2018-01-03 01:53:52 【问题描述】:

我在结构化流中有以下数据框:

TimeStamp|Room|Temperature|
00:01:29 | 1  | 55        | 
00:01:34 | 2  | 51        | 
00:01:36 | 1  | 56        | 
00:02:03 | 2  | 49        | 

我正在尝试检测温度何时低于某个温度(在本例中为 50)。我有查询的那部分工作。现在,我需要通过这样的 POST 调用将此信息传递给 API 端点:'/api/lowTemperature/' 带有时间戳和请求正文中的温度。因此,在上述情况下,我需要发送:

POST /api/lowTemperature/2
BODY:  "TimeStamp":"00:02:03",
       "Temperature":"49" 

知道如何使用 PySpark 实现这一目标吗?

我想到的一种方法是使用自定义流接收器,但是,我似乎找不到任何有关使用 Python 实现此目的的文档。

【问题讨论】:

【参考方案1】:

好消息,因为最近为 ForeachWriter 添加了对 Python 的支持。我在 Python 中为 REST 和 Azure 事件网格创建了一个,它相当简单。 (基本)文档可以在这里找到:https://docs.databricks.com/spark/latest/structured-streaming/foreach.html#using-python

【讨论】:

【参考方案2】:

在我最初回复时,ForeachWriter 仅支持 Java/Scala,但现在它也支持 Python。

确保您阅读了有关执行语义的部分并了解如何避免重复的 API 调用(如果这是一个问题)。

【讨论】:

以上是关于PySpark 结构化流式处理:将查询的输出传递到 API 端点的主要内容,如果未能解决你的问题,请参考以下文章

pyspark 结构化流不使用 query.lastProgress 或其他标准指标更新查询指标

使用 Python SDK 进行数据流流式处理:将 PubSub 消息转换为 BigQuery 输出

使用 pyspark 流式传输到 HBase

PySpark 处理流数据并将处理后的数据保存到文件

发生异常:pyspark.sql.utils.AnalysisException '必须使用 writeStream.start();;\nkafka' 执行带有流式源的查询

PySpark Hive 查询未显示输出