PySpark 中的 KMeans 聚类

Posted

技术标签:

【中文标题】PySpark 中的 KMeans 聚类【英文标题】:KMeans clustering in PySpark 【发布时间】:2018-05-15 02:33:15 【问题描述】:

我有一个包含许多列的 spark 数据框“mydataframe”。我试图只在两列上运行 kmeans:lat 和 long(纬度和经度),使用它们作为简单值)。我想仅基于这 2 列提取 7 个集群,然后我想将集群分配附加到我的原始数据框。我试过了:

from numpy import array
from math import sqrt
from pyspark.mllib.clustering import KMeans, KMeansModel

# Prepare a data frame with just 2 columns:
data = mydataframe.select('lat', 'long')
data_rdd = data.rdd  # needs to be an RDD
data_rdd.cache()

# Build the model (cluster the data)
clusters = KMeans.train(data_rdd, 7, maxIterations=15, initializationMode="random")

但我在一段时间后得到一个错误:

org.apache.spark.SparkException:作业因阶段故障而中止:阶段 5191.0 中的任务 1 失败 4 次,最近一次失败:阶段 5191.0 中丢失任务 1.3(TID 260738、10.19.211.69、执行程序 1):org .apache.spark.api.python.PythonException: Traceback(最近一次调用最后一次)

我已尝试分离并重新连接集群。结果相同。我做错了什么?

【问题讨论】:

在地理数据上,使用Haversine距离,不要使用kmeans。 @Anony-Mousse -哇,谢谢!您会推荐什么更适合经纬度聚类? Haversine 距离和 OPTICS 聚类。 【参考方案1】:

因为,基于another recent question of yours,我猜您是 Spark 集群的第一步(您甚至导入了 sqrtarray,但从未使用过它们,可能是因为它就像在 @ 987654322@),让我在更一般的层面上提供建议,而不是在您在这里提出的具体问题中提供建议(希望也可以让您免于随后打开 3-4 个问题,试图将您的集群分配重新放入您的数据框中).. .

自从

    您的数据已经在数据框中

    您想将集群成员重新附加到您的初始 数据框

您没有理由恢复到 RDD 并使用 (soon to be deprecated) MLlib 包;使用(现在推荐的)ML 包,您将更轻松、优雅、高效地完成工作,该包直接与数据帧一起使用。

第 0 步 - 制作一些类似于您的玩具数据:

spark.version
# u'2.2.0'

df = spark.createDataFrame([[0, 33.3, -17.5],
                              [1, 40.4, -20.5],
                              [2, 28., -23.9],
                              [3, 29.5, -19.0],
                              [4, 32.8, -18.84]
                             ],
                              ["other","lat", "long"])

df.show()
# +-----+----+------+
# |other| lat|  long|
# +-----+----+------+
# |    0|33.3| -17.5|
# |    1|40.4| -20.5| 
# |    2|28.0| -23.9|
# |    3|29.5| -19.0|
# |    4|32.8|-18.84|
# +-----+----+------+

第 1 步 - 组装您的功能

与现有的大多数 ML 包相比,Spark ML 要求将输入特征收集在数据框的单列中,通常命名为 features;它提供了一个特定的方法来做到这一点,VectorAssembler:

from pyspark.ml.feature import VectorAssembler

vecAssembler = VectorAssembler(inputCols=["lat", "long"], outputCol="features")
new_df = vecAssembler.transform(df)
new_df.show()
# +-----+----+------+-------------+ 
# |other| lat|  long|     features|
# +-----+----+------+-------------+
# |    0|33.3| -17.5| [33.3,-17.5]|
# |    1|40.4| -20.5| [40.4,-20.5]|
# |    2|28.0| -23.9| [28.0,-23.9]| 
# |    3|29.5| -19.0| [29.5,-19.0]|
# |    4|32.8|-18.84|[32.8,-18.84]|
# +-----+----+------+-------------+ 

可能已经猜到了,参数inputCols 用于告诉VectoeAssembler 我们数据框中的哪些特定列将用作特征。

第 2 步 - 拟合您的 KMeans 模型

from pyspark.ml.clustering import KMeans

kmeans = KMeans(k=2, seed=1)  # 2 clusters here
model = kmeans.fit(new_df.select('features'))

select('features') 在这里用于告诉算法将数据帧的哪一列用于聚类 - 请记住,在上面的第 1 步之后,您原来的 latlong 特征将不再直接使用。

第 3 步 - 转换您的初始数据框以包含集群分配

transformed = model.transform(new_df)
transformed.show()    
# +-----+----+------+-------------+----------+ 
# |other| lat|  long|     features|prediction|
# +-----+----+------+-------------+----------+
# |    0|33.3| -17.5| [33.3,-17.5]|         0| 
# |    1|40.4| -20.5| [40.4,-20.5]|         1|
# |    2|28.0| -23.9| [28.0,-23.9]|         0|
# |    3|29.5| -19.0| [29.5,-19.0]|         0|
# |    4|32.8|-18.84|[32.8,-18.84]|         0|
# +-----+----+------+-------------+----------+

transformed 数据框的最后一列 prediction 显示了集群分配 - 在我的玩具案例中,我在集群 #0 中有 4 条记录,在集群 #1 中有 1 条记录。

您可以使用select 语句进一步操作transformed 数据框,甚至可以使用dropfeatures 列(现在已完成其功能,可能不再需要)...

希望您现在更接近您最初真正想要实现的目标。对于提取集群统计信息等,another recent answer of mine 可能会有所帮助...

【讨论】:

亲爱的desertnaut,非常感谢您抽出宝贵的时间编写我读过的最好的***答案。我一定会继续保持它的优秀来源。是的,你猜对了——我会问更多的问题! :) 我不知道我正在使用一些旧的、折旧的图书馆,我很高兴你向我展示了“正确的道路”。我明白了你出色的解释中的一切。一个小问题(与 Spark 相关比与 kMeans 相关更多):即使 df 很大,这是否可以 - 从存储和内存的角度来看 - 生成越来越多的新数据帧(df,然后是 df_new)? @user3245256 标准做法是将转换后的数据分配到新的数据帧中。无论如何,实验并看看...... 正如@desertnaut 提到的,为您的 ML 操作转换为 rdd 效率非常低。话虽如此,唉,即使pyspark.ml.clustering 库中的KMeans 方法在获取模型输出时仍然使用collect 函数。当在非常大的数据集上应用 Kmeans 时,这会使 spark 功能无用,并且您的所有工作节点都将处于空闲状态,并且只有您的驱动程序节点会超时工作【参考方案2】:

尽管我给出了其他一般性的回答,并且如果您出于某种原因必须坚持使用 MLlib 和 RDD,这就是导致您使用同一个玩具 df 时出错的原因。

当您将数据框中的select 列转换为 RDD 时,正如您所做的那样,结果是 Rows 的 RDD:

df.select('lat', 'long').rdd.collect()
# [Row(lat=33.3, long=-17.5), Row(lat=40.4, long=-20.5), Row(lat=28.0, long=-23.9), Row(lat=29.5, long=-19.0), Row(lat=32.8, long=-18.84)]

不适合作为 MLlib KMeans 的输入。您需要map 操作才能使其工作:

df.select('lat', 'long').rdd.map(lambda x: (x[0], x[1])).collect()
# [(33.3, -17.5), (40.4, -20.5), (28.0, -23.9), (29.5, -19.0), (32.8, -18.84)]

所以,你的代码应该是这样的:

from pyspark.mllib.clustering import KMeans, KMeansModel

rdd = df.select('lat', 'long').rdd.map(lambda x: (x[0], x[1]))
clusters = KMeans.train(rdd, 2, maxIterations=10, initializationMode="random") # works OK
clusters.centers
# [array([ 40.4, -20.5]), array([ 30.9 , -19.81])]

【讨论】:

很好的补充。一件事,collect() 返回列表,您也可以将数据帧发送到 kmeans 训练模型。 我们使用collect 仅用于最终结果;如果我们可以在这里使用它,那么就没有理由打扰 Spark - 我们会使用 scikit-learn 或类似的东西更好......

以上是关于PySpark 中的 KMeans 聚类的主要内容,如果未能解决你的问题,请参考以下文章

在 Spark 中使用 Kmeans 对小短语进行聚类

在 python / pyspark 中获取 k-means 质心和异常值

如何评估 R 中的 kmeans 聚类性能

python - 如何在python中的4维数据上绘制kmeans聚类?

使用 R 中的 wordcloud 从聚类向量中显示单个 kmeans 聚类

如何在 python 中的 KMeans 聚类中获得 X 值?