将 Pyspark Python k-means 模型预测插入具有原始 RDD 项和特征的 DF

Posted

技术标签:

【中文标题】将 Pyspark Python k-means 模型预测插入具有原始 RDD 项和特征的 DF【英文标题】:Insert Pyspark Python k-means model prediction into DF with original RDD items with features 【发布时间】:2019-09-29 22:17:15 【问题描述】:

我有一个带有 ID 和功能的 Rdd。简而言之,我正在尝试输出与标签(“id”)与它所属的簇号(0、1、2 等)相匹配的东西

rdd 数据集中的三行看起来像这样(虽然它更像是 100 行,第一项是字符串,其余的是浮点数):

rdd = ["id1",2,12,3.4,19], ["id2",4,17,3.6,40] ["id3",5,14,2.3,47]...

我通过创建一个只有特征的 RDD 来运行这个模型的特征(id 破坏了直接在原始 RDD 上运行的模型):

feature_rdd = [2,12,3.4,19], [4,17,3.6,40] [5,14,2.3,47]...

model = KMeans.train(parsedData, num_clusters, maxIterations=max_iterations, initializationMode=initialization_mode, seed=seed)

我预测使用:

predictions = model.predict(feature_rdd)

并得到一个看起来像这样的RDD,对应于该行的预测的簇号:

[0, 0, 1, 2, 0...]

我想以某种方式将 id 与预测结合起来,这样我就可以报告哪些 ID 属于哪个集群。我找不到这样做的好方法。我尝试合并这两个 RDD,但随后它只在新 Rdd 中提供了另一个项目,而不是将每个预测与每个 ID 配对。我也尝试过转换两个数据帧,但是在变量的混合转换方面遇到了问题。我正在寻找类似于数据框的东西:

*****************
* id  * cluster *
*****************
* "id1" *    0  * 
* "id2" *    0  *
* "id3" *    1  *
*****************

或者只是以某种方式配对并导出到列表等。

["id1", 0],["id2", 1]...

但非常感谢任何有关如何解决此问题的帮助。

【问题讨论】:

【参考方案1】:

您可以使用map 获取具有特征的rdd 的第一个条目,然后使用zip 添加预测的集群。您可以将生成的rdd 转换为createDataFrame。下面是一个示例,希望对您有所帮助。

from pyspark.sql.types import StructType, StructField, StringType, IntegerType

data = sc.parallelize(
[
    ('id1',1,2,3),
    ('id2',2,3,4),
    ('id3',3,4,5)
])
predictions = sc.parallelize(
[
    (1),
    (0),
    (1)
])

# zip the id's (first element of each entry in the rdd) and the predictions into one rdd.
id_and_predictions = data.map(lambda x: x[0]).zip(predictions)

# Convert to DataFrame
schema = StructType([
    StructField('id',StringType()), StructField('cluster',IntegerType())
])
df = sqlContext.createDataFrame(id_and_predictions,schema)
df.show()

输出:

+---+-------+
| id|cluster|
+---+-------+
|id1|      1|
|id2|      0|
|id3|      1|
+---+-------+

【讨论】:

以上是关于将 Pyspark Python k-means 模型预测插入具有原始 RDD 项和特征的 DF的主要内容,如果未能解决你的问题,请参考以下文章

pyspark 中的 K-means 在 jupyter notebook 中无限运行,在 zeppelin notebook 中运行良好

在 Apache Spark Python 中自定义 K-means 的距离公式

Pyspark Dataframes:创建要在 python 中的聚类中使用的特征列

Pyspark - ValueError:无法将字符串转换为浮点数/浮点()的无效文字

如何使用PySpark将SparseVector中的前X个单词转换为字符串数组

Python 使用k-means方法将列表中相似的句子聚为一类