PySpark:将 PythonRDD 附加/合并到 PySpark 数据帧
Posted
技术标签:
【中文标题】PySpark:将 PythonRDD 附加/合并到 PySpark 数据帧【英文标题】:PySpark: append/merge PythonRDD to a PySpark dataframe 【发布时间】:2016-09-16 17:55:48 【问题描述】:我正在使用以下代码创建一个聚类模型,然后将每条记录分类到某个聚类:
from pyspark.mllib.clustering import KMeans
from pyspark.mllib.linalg import Vectors
spark_df = sqlContext.createDataFrame(pandas_df)
rdd = spark_df.rdd.map(lambda data: Vectors.dense([float(c) for c in data]))
model = KMeans.train(rdd, 2, maxIterations=10, initializationMode="random")
result = model.predict(red)
如何将预测结果作为附加列附加回 spark_df?谢谢!
【问题讨论】:
为什么不首先使用ml
?
我无法将 spark_df(数据框)转换为 spark 数据集。如果我只是将数据框与 ml 一起使用,它将无法正常工作。关于如何将数据框转换为数据集的任何建议?谢谢!
如果我使用 ml: model = kmeans.fit(spark_df),我得到错误:AnalysisException: u"cannot resolve 'features
' given input columns: [field_1, field_2, ... field10 ];"所以在我看来我不能直接使用 spark_df
@SandipanDey withcolumn 仅适用于现有列的派生列。在这里它没有给出预期的结果。
@Edamame 你能解决这个问题吗?任何帮助都会很棒...
【参考方案1】:
pyspark.mllib.clustering.KMeansModel
是可直接在 PySpark 转换中使用的稀有模型之一,因此您可以简单地将 map
与 predict
结合使用:
rdd.map(lambda point: (model.predict(point), point))
在一般情况下,zip
是适合该工作的工具:
rdd.zip(model.predict(rdd))
【讨论】:
zip 给出一个包含行和预测值的元组。我们如何将它作为列“new_col”添加到数据帧本身,以便我们可以使用 saveastable( )。 @venkat 这是mllib
不是ml
。对于ml
模型,只需使用transform
方法。
我说的是 mllib 而不是 ml,我如何获取带有额外预测列的数据帧并写回 db?
@venkat mllib
不使用DataFrames
。如果您希望 DataFrame
作为 n 输出,您必须按照标准步骤转换您的 RDD
。 ml
开箱即用。以上是关于PySpark:将 PythonRDD 附加/合并到 PySpark 数据帧的主要内容,如果未能解决你的问题,请参考以下文章
使用 pyspark 在循环中附加 Spark DataFrames 的有效方法
PySpark:将字典数据附加到 PySpark DataFrame