将 Pyspark Python k-means 模型预测插入具有原始 RDD 项和特征的 DF
Posted
技术标签:
【中文标题】将 Pyspark Python k-means 模型预测插入具有原始 RDD 项和特征的 DF【英文标题】:Insert Pyspark Python k-means model prediction into DF with original RDD items with features 【发布时间】:2019-09-29 22:17:15 【问题描述】:我有一个带有 ID 和功能的 Rdd。简而言之,我正在尝试输出与标签(“id”)与它所属的簇号(0、1、2 等)相匹配的东西
rdd 数据集中的三行看起来像这样(虽然它更像是 100 行,第一项是字符串,其余的是浮点数):
rdd = ["id1",2,12,3.4,19], ["id2",4,17,3.6,40] ["id3",5,14,2.3,47]...
我通过创建一个只有特征的 RDD 来运行这个模型的特征(id 破坏了直接在原始 RDD 上运行的模型):
feature_rdd = [2,12,3.4,19], [4,17,3.6,40] [5,14,2.3,47]...
model = KMeans.train(parsedData, num_clusters, maxIterations=max_iterations, initializationMode=initialization_mode, seed=seed)
我预测使用:
predictions = model.predict(feature_rdd)
并得到一个看起来像这样的RDD,对应于该行的预测的簇号:
[0, 0, 1, 2, 0...]
我想以某种方式将 id 与预测结合起来,这样我就可以报告哪些 ID 属于哪个集群。我找不到这样做的好方法。我尝试合并这两个 RDD,但随后它只在新 Rdd 中提供了另一个项目,而不是将每个预测与每个 ID 配对。我也尝试过转换两个数据帧,但是在变量的混合转换方面遇到了问题。我正在寻找类似于数据框的东西:
*****************
* id * cluster *
*****************
* "id1" * 0 *
* "id2" * 0 *
* "id3" * 1 *
*****************
或者只是以某种方式配对并导出到列表等。
["id1", 0],["id2", 1]...
但非常感谢任何有关如何解决此问题的帮助。
【问题讨论】:
【参考方案1】:您可以使用map
获取具有特征的rdd 的第一个条目,然后使用zip
添加预测的集群。您可以将生成的rdd
转换为createDataFrame
。下面是一个示例,希望对您有所帮助。
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
data = sc.parallelize(
[
('id1',1,2,3),
('id2',2,3,4),
('id3',3,4,5)
])
predictions = sc.parallelize(
[
(1),
(0),
(1)
])
# zip the id's (first element of each entry in the rdd) and the predictions into one rdd.
id_and_predictions = data.map(lambda x: x[0]).zip(predictions)
# Convert to DataFrame
schema = StructType([
StructField('id',StringType()), StructField('cluster',IntegerType())
])
df = sqlContext.createDataFrame(id_and_predictions,schema)
df.show()
输出:
+---+-------+
| id|cluster|
+---+-------+
|id1| 1|
|id2| 0|
|id3| 1|
+---+-------+
【讨论】:
以上是关于将 Pyspark Python k-means 模型预测插入具有原始 RDD 项和特征的 DF的主要内容,如果未能解决你的问题,请参考以下文章
pyspark 中的 K-means 在 jupyter notebook 中无限运行,在 zeppelin notebook 中运行良好
在 Apache Spark Python 中自定义 K-means 的距离公式
Pyspark Dataframes:创建要在 python 中的聚类中使用的特征列
Pyspark - ValueError:无法将字符串转换为浮点数/浮点()的无效文字