如何在 pyspark 的结构化流作业中运行地图转换

Posted

技术标签:

【中文标题】如何在 pyspark 的结构化流作业中运行地图转换【英文标题】:how to run map transformation in a structured streaming job in pyspark 【发布时间】:2020-10-07 02:02:50 【问题描述】:

我正在尝试使用进行 REST API 调用的 map() 转换来设置结构化流式传输作业。以下是详细信息:

(1)
df=spark.readStream.format('delta') \
.option("maxFilesPerTrigger", 1000) \
.load(f'file_location') 

(2)
respData=df.select("resource", "payload").rdd.map(lambda row: put_resource(row[0], row[1])).collect()
respDf=spark.createDataFrame(respData, ["resource", "status_code", "reason"])

(3)
respDf.writeStream \
.trigger(once=True) \
.outputMode("append") \
.format("delta") \
.option("path", f'file_location/Response') \
.option("checkpointLocation", f'file_location/Response/Checkpoints') \
.start()

但是,我收到一个错误:必须在步骤 (2) 中使用 writeStream.start() 执行带有流式源的查询。

任何帮助将不胜感激。谢谢。

【问题讨论】:

【参考方案1】:

您还必须在 df 上执行您的流 意思是 df.writeStream.start()..

这里有一个类似的帖子:

Queries with streaming sources must be executed with writeStream.start();

【讨论】:

所以你的意思是不可能在 readStream 和 writeStream 之间的 rdd 上运行一些映射转换? 您不只是在运行地图转换。您正在收集结果并将其用作输入来创建新的数据框。事实上,你有 2 个正在运行的流,你应该同时启动它们。如果您想对流数据帧进行转换,您可以执行 spark.readStream..load().map().writeStream.start

以上是关于如何在 pyspark 的结构化流作业中运行地图转换的主要内容,如果未能解决你的问题,请参考以下文章

哪个选项使用 pyspark 提供最佳性能?使用地图进行 UDF 或 RDD 处理?

如何使用 Python 或 Pyspark 或 scala 在数据块中获取笔记本的作业运行结果日志

Pyspark:如何在 Yarn 集群上运行作业时对多个文件使用 --files 标签

如何阻止 Spark 结构化流每次都列出 S3 存储桶中的所有文件

在结构化流 API (pyspark) 中使用 redshift 作为 readStream 的 JDBC 源

如何检查 Dataproc 上 pyspark 作业的每个执行程序/节点内存使用指标?