PySpark 结构化流式处理:将查询的输出传递到 API 端点
Posted
技术标签:
【中文标题】PySpark 结构化流式处理:将查询的输出传递到 API 端点【英文标题】:PySpark Structured Streaming: Pass output of Query to API endpoint 【发布时间】:2018-01-03 01:53:52 【问题描述】:我在结构化流中有以下数据框:
TimeStamp|Room|Temperature|
00:01:29 | 1 | 55 |
00:01:34 | 2 | 51 |
00:01:36 | 1 | 56 |
00:02:03 | 2 | 49 |
我正在尝试检测温度何时低于某个温度(在本例中为 50)。我有查询的那部分工作。现在,我需要通过这样的 POST 调用将此信息传递给 API 端点:'/api/lowTemperature/' 带有时间戳和请求正文中的温度。因此,在上述情况下,我需要发送:
POST /api/lowTemperature/2
BODY: "TimeStamp":"00:02:03",
"Temperature":"49"
知道如何使用 PySpark 实现这一目标吗?
我想到的一种方法是使用自定义流接收器,但是,我似乎找不到任何有关使用 Python 实现此目的的文档。
【问题讨论】:
【参考方案1】:好消息,因为最近为 ForeachWriter 添加了对 Python 的支持。我在 Python 中为 REST 和 Azure 事件网格创建了一个,它相当简单。 (基本)文档可以在这里找到:https://docs.databricks.com/spark/latest/structured-streaming/foreach.html#using-python
【讨论】:
【参考方案2】:在我最初回复时,ForeachWriter 仅支持 Java/Scala,但现在它也支持 Python。
确保您阅读了有关执行语义的部分并了解如何避免重复的 API 调用(如果这是一个问题)。
【讨论】:
以上是关于PySpark 结构化流式处理:将查询的输出传递到 API 端点的主要内容,如果未能解决你的问题,请参考以下文章
pyspark 结构化流不使用 query.lastProgress 或其他标准指标更新查询指标
使用 Python SDK 进行数据流流式处理:将 PubSub 消息转换为 BigQuery 输出
发生异常:pyspark.sql.utils.AnalysisException '必须使用 writeStream.start();;\nkafka' 执行带有流式源的查询