PySpark:将 PythonRDD 附加/合并到 PySpark 数据帧

Posted

技术标签:

【中文标题】PySpark:将 PythonRDD 附加/合并到 PySpark 数据帧【英文标题】:PySpark: append/merge PythonRDD to a PySpark dataframe 【发布时间】:2016-09-16 17:55:48 【问题描述】:

我正在使用以下代码创建一个聚类模型,然后将每条记录分类到某个聚类:

from pyspark.mllib.clustering import KMeans
from pyspark.mllib.linalg import Vectors

spark_df = sqlContext.createDataFrame(pandas_df)
rdd = spark_df.rdd.map(lambda data: Vectors.dense([float(c) for c in data]))
model = KMeans.train(rdd, 2, maxIterations=10,  initializationMode="random")

result = model.predict(red)

如何将预测结果作为附加列附加回 spark_df?谢谢!

【问题讨论】:

为什么不首先使用ml 我无法将 spark_df(数据框)转换为 spark 数据集。如果我只是将数据框与 ml 一起使用,它将无法正常工作。关于如何将数据框转换为数据集的任何建议?谢谢! 如果我使用 ml: model = kmeans.fit(spark_df),我得到错误:AnalysisException: u"cannot resolve 'features' given input columns: [field_1, field_2, ... field10 ];"所以在我看来我不能直接使用 spark_df @SandipanDey withcolumn 仅适用于现有列的派生列。在这里它没有给出预期的结果。 @Edamame 你能解决这个问题吗?任何帮助都会很棒... 【参考方案1】:

pyspark.mllib.clustering.KMeansModel 是可直接在 PySpark 转换中使用的稀有模型之一,因此您可以简单地将 mappredict 结合使用:

rdd.map(lambda point: (model.predict(point), point))

在一般情况下,zip 是适合该工作的工具:

rdd.zip(model.predict(rdd))

【讨论】:

zip 给出一个包含行和预测值的元组。我们如何将它作为列“new_col”添加到数据帧本身,以便我们可以使用 saveastable( )。 @venkat 这是mllib 不是ml。对于ml 模型,只需使用transform 方法。 我说的是 mllib 而不是 ml,我如何获取带有额外预测列的数据帧并写回 db? @venkat mllib 不使用DataFrames。如果您希望 DataFrame 作为 n 输出,您必须按照标准步骤转换您的 RDDml 开箱即用。

以上是关于PySpark:将 PythonRDD 附加/合并到 PySpark 数据帧的主要内容,如果未能解决你的问题,请参考以下文章

pyspark对应的scala代码PythonRDD对象

使用 pyspark 在循环中附加 Spark DataFrames 的有效方法

PySpark:将字典数据附加到 PySpark DataFrame

pyspark错误记录1: Py4JJavaError

PythonRDD[1] at RDD at PythonRDD.scala:53

PySpark Dataframe:将一个单词附加到列的每个值