使用 Pyspark 从结构化流数据帧构建 Spark ML 管道模型

Posted

技术标签:

【中文标题】使用 Pyspark 从结构化流数据帧构建 Spark ML 管道模型【英文标题】:Building Spark ML pipeline model from structured streaming DataFrame using Pyspark 【发布时间】:2018-08-08 20:01:21 【问题描述】:

我是数据科学的新手,正在寻求帮助。

我想从结构化流数据帧构建 Spark ML 管道模型,但遇到了一些错误。请看下面的代码 sn-p 和错误信息。

注意:我们已经离线训练了我们的模型,只是想从结构化流数据帧构建评分模型。

代码:

pipelineModel= PipelineModel.load('/model/path/')
scoringDf = pipelineModel.transform(streamingDf) 

错误信息:

Py4JJavaError: An error occurred while calling o1910.transform.
: org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
Kafka

我们使用的是 DSE Cassandra v 6.0,它具有 Spark 2.2.1。那么有人会建议将预训练的 PipelineML 对象转换为 Spark 2.2 中的流数据帧的问题吗?

请给我一些意见。

【问题讨论】:

Spark Structured Streaming and Spark-Ml Regression的可能重复 【参考方案1】:

您正在尝试在流式传输结束之前进行转换。流是一个不同的概念,在这里您假设您正在处理批处理数据,您可以一步完成所有事情。

尝试通过在调用转换方法之前添加 your-streaming-context-variable.awaitTermination() 来等待流计算结束。

您可以阅读here 了解 Spark 的流式计算。

【讨论】:

如何更改代码以解决该错误消息? 在您调用转换函数之前,正如我在回答中解释的那样。在此示例中,在 scoreDf = pipelineModel.transform(streamingDf) 行之前,对流上下文变量调用 awaitTermination() 函数。 你能添加一个代码 sn-p 来说明如何解决这个问题吗?他的代码非常简洁——可能是你所说的,但也可能是其他一些问题(在管道中使用 VectorAssembler 或 OneHotEncoder)。添加一个代码sn-p会帮助他测试你的答案是否正确

以上是关于使用 Pyspark 从结构化流数据帧构建 Spark ML 管道模型的主要内容,如果未能解决你的问题,请参考以下文章

使 Spark 结构化流中的 JSON 可以在 python (pyspark) 中作为没有 RDD 的数据帧访问

使用 pyspark 从 s3 位置读取镶木地板文件的文件夹到 pyspark 数据帧

火花流到pyspark json文件中的数据帧

在结构化流 API (pyspark) 中使用 redshift 作为 readStream 的 JDBC 源

无法将日志功能应用于 pyspark 数据帧

如何在 pyspark 结构化流中使用 maxOffsetsPerTrigger?