PySpark ML:获取 KMeans 集群统计信息

Posted

技术标签:

【中文标题】PySpark ML:获取 KMeans 集群统计信息【英文标题】:PySpark ML: Get KMeans cluster statistics 【发布时间】:2018-04-18 05:22:13 【问题描述】:

我已经建立了一个 KMeansModel。我的结果存储在一个名为 transformed

(a) 如何解读transformed的内容?

(b) 如何从 transformed 创建一个或多个 Pandas DataFrame,以显示 14 个集群中每个集群的 13 个特征中的每一个的摘要统计信息?

from pyspark.ml.clustering import KMeans
# Trains a k-means model.
kmeans = KMeans().setK(14).setSeed(1)
model = kmeans.fit(X_spark_scaled) # Fits a model to the input dataset with optional parameters.

transformed = model.transform(X_spark_scaled).select("features", "prediction") # X_spark_scaled is my PySpark DataFrame consisting of 13 features
transformed.show(5, truncate = False)
+------------------------------------------------------------------------------------------------------------------------------------+----------+
|features                                                                                                                            |prediction|
+------------------------------------------------------------------------------------------------------------------------------------+----------+
|(14,[4,5,7,8,9,13],[1.0,1.0,485014.0,0.25,2.0,1.0])                                                                                 |12        |
|(14,[2,7,8,9,12,13],[1.0,2401233.0,1.0,1.0,1.0,1.0])                                                                                |2         |
|(14,[2,4,5,7,8,9,13],[0.3333333333333333,0.6666666666666666,0.6666666666666666,2429111.0,0.9166666666666666,1.3333333333333333,3.0])|2         |
|(14,[4,5,7,8,9,12,13],[1.0,1.0,2054748.0,0.15384615384615385,11.0,1.0,1.0])                                                         |11        |
|(14,[2,7,8,9,13],[1.0,43921.0,1.0,1.0,1.0])                                                                                         |1         |
+------------------------------------------------------------------------------------------------------------------------------------+----------+
only showing top 5 rows

顺便说一句,我从另一个 SO 帖子中发现,我可以将这些功能映射到它们的名称,如下所示。在一个或多个 Pandas 数据帧中为每个集群的每个特征提供汇总统计信息(平均值、中值、标准、最小值、最大值)会很好。

attr_list = [attr for attr in chain(*transformed.schema['features'].metadata['ml_attr']['attrs'].values())]
attr_list

根据 cmets 中的请求,这里是由 2 条数据记录组成的快照(不想提供太多记录——这里是专有信息)

+---------------------+------------------------+-----------------------+----------------------+----------------------+------------------------------+---------------------------------+------------+-------------------+--------------------+------------------------------------+--------------------------+-------------------------------+-----------------+--------------------+--------------------+
|device_type_robot_pct|device_type_smart_tv_pct|device_type_desktop_pct|device_type_tablet_pct|device_type_mobile_pct|device_type_mobile_persist_pct|visitors_seen_with_anonymiser_pct|ip_time_span|          ip_weight|mean_ips_per_visitor|visitors_seen_with_multi_country_pct|international_visitors_pct|visitors_seen_with_multi_ua_pct|count_tuids_on_ip|            features|      scaledFeatures|
+---------------------+------------------------+-----------------------+----------------------+----------------------+------------------------------+---------------------------------+------------+-------------------+--------------------+------------------------------------+--------------------------+-------------------------------+-----------------+--------------------+--------------------+
|                  0.0|                     0.0|                    0.0|                   0.0|                   1.0|                           1.0|                              0.0|    485014.0|               0.25|                 2.0|                                 0.0|                       0.0|                            0.0|              1.0|(14,[4,5,7,8,9,13...|(14,[4,5,7,8,9,13...|
|                  0.0|                     0.0|                    1.0|                   0.0|                   0.0|                           0.0|                              0.0|   2401233.0|                1.0|                 1.0|                                 0.0|                       0.0|                            1.0|              1.0|(14,[2,7,8,9,12,1...|(14,[2,7,8,9,12,1...|

【问题讨论】:

能否也提供一个初始数据样本X_spark_scaled 我宁愿使用 sklearn 而不是 pyspark。运行基准测试,哪个更快?哪个更容易使用?哪个发现更好的集群? @Anony-Mousse 我实际上尝试过 sklearn,但我有大约 6000 亿条记录,这对于 sklearn 来说太多了,无法记忆 @desertnaut 今天晚些时候会做 这不仅仅是您的问题的“切线”:pyspark 比 sklearn 更受限制,如果您使用 sklearn 而不是 pyspark(不是原生 Python),那么问题会很多更容易回答。 【参考方案1】:

正如 Anony-Mousse 所评论的那样,(Py)Spark ML 确实比 scikit-learn 或其他类似软件包更多受限,而且这样的功能并非微不足道;不过,这里有一种方法可以得到你想要的(集群统计):

spark.version
# u'2.2.0'

from pyspark.ml.clustering import KMeans
from pyspark.ml.linalg import Vectors

# toy data - 5-d features including sparse vectors
df = spark.createDataFrame(
 [(Vectors.sparse(5,[(0, 164.0),(1,520.0)]), 1.0),
  (Vectors.dense([519.0,2723.0,0.0,3.0,4.0]), 1.0),
  (Vectors.sparse(5,[(0, 2868.0), (1, 928.0)]), 1.0),
  (Vectors.sparse(5,[(0, 57.0), (1, 2715.0)]), 0.0),
  (Vectors.dense([1241.0,2104.0,0.0,0.0,2.0]), 1.0)],
 ["features", "target"])

df.show()
# +--------------------+------+ 
# |            features|target| 
# +--------------------+------+ 
# |(5,[0,1],[164.0,5...|   1.0|
# |[519.0,2723.0,0.0...|   1.0| 
# |(5,[0,1],[2868.0,...|   1.0|
# |(5,[0,1],[57.0,27...|   0.0| 
# |[1241.0,2104.0,0....|   1.0|
# +--------------------+------+

kmeans = KMeans(k=3, seed=1)
model = kmeans.fit(df.select('features'))

transformed = model.transform(df).select("features", "prediction")
transformed.show()
# +--------------------+----------+
# |            features|prediction|
# +--------------------+----------+
# |(5,[0,1],[164.0,5...|         1| 
# |[519.0,2723.0,0.0...|         2|
# |(5,[0,1],[2868.0,...|         0|
# |(5,[0,1],[57.0,27...|         2|
# |[1241.0,2104.0,0....|         2|
# +--------------------+----------+

到这里,关于你的第一个问题:

如何解读transformed的内容?

features 列只是原始数据中同一列的复制。

prediction 列是相应数据记录所属的集群;在我的示例中,有 5 条数据记录和 k=3 集群,我最终在集群 #0 中获得 1 条记录,在集群 #1 中获得 1 条记录,在集群 #2 中获得 3 条记录。

关于你的第二个问题:

如何从 transformed 创建一个或多个 Pandas DataFrame,以显示 14 个集群中每个集群的 13 个特征中的每一个的摘要统计信息?

(注意:您似乎有 14 个功能,而不是 13 个...)

这是一个看似简单的任务的一个很好的例子,不幸的是,PySpark 没有提供现成的功能 - 尤其是因为所有功能都分组在一个单个向量features中;为此,我们必须首先“反汇编”features,有效地提出VectorAssemblerinvert 操作。

我目前能想到的唯一方法是暂时恢复到 RDD 并执行 map 操作 [编辑:这不是真的必要 - 请参阅下面的更新];这是上面我的集群 #2 的示例,其中包含密集和稀疏向量:

# keep only cluster #2:
cl_2 = transformed.filter(transformed.prediction==2)
cl_2.show() 
# +--------------------+----------+ 
# |            features|prediction|
# +--------------------+----------+
# |[519.0,2723.0,0.0...|         2|
# |(5,[0,1],[57.0,27...|         2|
# |[1241.0,2104.0,0....|         2| 
# +--------------------+----------+

# set the data dimensionality as a parameter:
dimensionality = 5

cluster_2 = cl_2.drop('prediction').rdd.map(lambda x: [float(x[0][i]) for i in range(dimensionality)]).toDF(schema=['x'+str(i) for i in range(dimensionality)])
cluster_2.show()
# +------+------+---+---+---+ 
# |    x0|    x1| x2| x3| x4|
# +------+------+---+---+---+
# | 519.0|2723.0|0.0|3.0|4.0|
# |  57.0|2715.0|0.0|0.0|0.0| 
# |1241.0|2104.0|0.0|0.0|2.0|
# +------+------+---+---+---+

(如果您的初始数据位于 Spark 数据框 initial_data 中,则可以将最后一部分更改为 toDF(schema=initial_data.columns),以保留原始特征名称。)

从这一点开始,您可以将 cluster_2 数据帧转换为 pandas 数据帧(如果它适合您的记忆),或者使用 Spark 数据帧的 describe() 函数来获取您的摘要统计信息:

cluster_2.describe().show()
# result:
+-------+-----------------+-----------------+---+------------------+---+ 
|summary|               x0|               x1| x2|                x3| x4|
+-------+-----------------+-----------------+---+------------------+---+ 
|  count|                3|                3|  3|                 3|  3|
|   mean|605.6666666666666|           2514.0|0.0|               1.0|2.0|
| stddev|596.7389155512932|355.0929455790413|0.0|1.7320508075688772|2.0|
|    min|             57.0|           2104.0|0.0|               0.0|0.0|
|    max|           1241.0|           2723.0|0.0|               3.0|4.0|
+-------+-----------------+-----------------+---+------------------+---+

在你的情况下使用上面的代码和dimensionality=14 应该可以完成这项工作......

meanstddev 中的所有这些(可以说是无用的)有效数字感到恼火?作为奖励,这是我提出的一个小实用函数 some time ago 进行了漂亮的总结:

def prettySummary(df):
    """ Neat summary statistics of a Spark dataframe
    Args:
        pyspark.sql.dataframe.DataFrame (df): input dataframe
    Returns:
        pandas.core.frame.DataFrame: a pandas dataframe with the summary statistics of df
    """
    import pandas as pd
    temp = df.describe().toPandas()
    temp.iloc[1:3,1:] = temp.iloc[1:3,1:].convert_objects(convert_numeric=True)
    pd.options.display.float_format = ':,.2f'.format
    return temp

stats_df = prettySummary(cluster_2)
stats_df
# result:
    summary     x0       x1   x2   x3   x4
 0  count        3        3    3    3    3 
 1   mean   605.67 2,514.00 0.00 1.00 2.00 
 2 stddev   596.74   355.09 0.00 1.73 2.00 
 3    min     57.0   2104.0  0.0  0.0  0.0 
 4    max   1241.0   2723.0  0.0  3.0  4.0

更新:再想一想,看到您的示例数据,我想出了一个更直接的解决方案,而无需调用中间 RDD(可以说是人们更愿意避免的操作) ,如果可能的话)...

关键观察是transformed的完整内容,即没有select语句;保持与上面相同的玩具数据集,我们得到:

transformed = model.transform(df)  # no 'select' statements
transformed.show()
# +--------------------+------+----------+
# |            features|target|prediction| 
# +--------------------+------+----------+
# |(5,[0,1],[164.0,5...|   1.0|         1|
# |[519.0,2723.0,0.0...|   1.0|         2|
# |(5,[0,1],[2868.0,...|   1.0|         0|
# |(5,[0,1],[57.0,27...|   0.0|         2|
# |[1241.0,2104.0,0....|   1.0|         2|
# +--------------------+------+----------+

如您所见,要转换的数据框 df 中存在的任何其他列(在我的情况下只有一个 - target)只是“通过”转换过程并最终出现在最终结果……

希望您开始明白这一点:如果df 包含您最初的 14 个功能,每个功能位于单独的列中,再加上名为 features 的第 15 列(大致如您的示例数据中所示,但没有最后一列) ,然后是下面的代码:

kmeans = KMeans().setK(14)
model = kmeans.fit(df.select('features'))
transformed = model.transform(df).drop('features')

将为您留下一个包含 15 列的 Spark 数据框 transformed,即您的初始 14 个特征加上一个带有相应集群编号的 prediction 列。

从这一点开始,您可以像我在上面显示的那样继续从transformedfilter 特定集群并获取您的汇总统计信息,但是您将避免(昂贵的...)转换为中间临时 RDD,从而将您的所有操作保持在更高效的 Spark 数据帧上下文中...

【讨论】:

“PySpark 不提供现成的功能......”完全同意。我们喜欢 sklearn,但我们庞大的数据量迫使我们使用 PySpark。 “提出 VectorAssembler 的反转操作”是的,它简洁地指定了我需要做的事情。非常感谢您提供详细而有用的答案。 @user2205916 不客气;重新“VectorAssembler 的反转操作” - 正如我在更新中解释的那样,如果您已经拥有初始功能,则没有必要 是的,所以我真的在我的脑海里问了一个错误的问题,这里也问错了问题 (#2)。但是你给出了正确的答案!我没有完全理解我的错,我用select 过滤掉了我的初始功能,并且不知道它们一开始就在那里。再次感谢! @user2205916 不要责怪自己——即使在docs,他们总是陪伴transformselect("features", "prediction");我最初喜欢 RDD 解决方案并非偶然......

以上是关于PySpark ML:获取 KMeans 集群统计信息的主要内容,如果未能解决你的问题,请参考以下文章

在 PySpark 中运行 KMeans 聚类

sparklyr ml_kmeans 字段“功能”不存在

如何获取相关矩阵值pyspark

初学Mahout测试kmeans算法

如何从 KMeans 集群中获取集群的名称?

在非 Spark 环境中加载 pyspark ML 模型