如何在pyspark中运行多个k意味着集群和使用groupBy

Posted

技术标签:

【中文标题】如何在pyspark中运行多个k意味着集群和使用groupBy【英文标题】:How to run multiple k means clustering and use groupBy in pyspark 【发布时间】:2021-06-01 17:38:03 【问题描述】:

我有一个这样的数据集:

|Seq_key|   |Class_id|  |value|
Seq_key 1   Class_id 1  value 1
Seq_key 1   Class_id 2  value 2
Seq_key 1   Class_id 3  value 3
Seq_key 1   Class_id 4  value 4
Seq_key 1   Class_id 5  value 5
Seq_key 1   Class_id 6  value 6
Seq_key 2   Class_id 1  value 1
Seq_key 2   Class_id 2  value 2
Seq_key 2   Class_id 3  value 3
Seq_key 2   Class_id 4  value 4
Seq_key 2   Class_id 5  value 5
Seq_key 2   Class_id 6  value 6
Seq_key 2   Class_id 7  value 7
Seq_key 3   Class_id 1  value 1
Seq_key 3   Class_id 2  value 2
Seq_key 3   Class_id 3  value 3
Seq_key 3   Class_id 4  value 4
Seq_key 3   Class_id 5  value 5
Seq_key 3   Class_id 6  value 6
Seq_key 3   Class_id 7  value 7
Seq_key 3   Class_id 8  value 8

每个Seq_keyClass_idsvalues 是互斥的。 我对每个 Seq_key 应用 k-means 聚类,并找到最佳数量的聚类、质心等,以便每个 Seq_key 的输出如下所示:

|Seq_key|   |Class id|  |Cluster|  |Centroid|
Seq_key 1   Class_id 1     1         128
Seq_key 1   Class_id 2     2         56
Seq_key 1   Class_id 3     3         100
Seq_key 1   Class_id 4     1         128
Seq_key 1   Class_id 5     1         128
Seq_key 1   Class_id 6     4         72
Seq_key 2   Class_id 1     1         5.5
Seq_key 2   Class_id 2     1         5.5
Seq_key 2   Class_id 3     2         3.4
Seq_key 2   Class_id 4     3         1.7
Seq_key 2   Class_id 5     1         5.5
Seq_key 2   Class_id 6     2         3.4
Seq_key 2   Class_id 7     2         3.4
Seq_key 3   Class_id 1     4         500
Seq_key 3   Class_id 2     1         700
Seq_key 3   Class_id 3     3         274
Seq_key 3   Class_id 4     2         189
Seq_key 3   Class_id 5     2         189
Seq_key 3   Class_id 6     4         500
Seq_key 3   Class_id 7     1         700
Seq_key 3   Class_id 8     3         274

目前,我手动循环遍历每个 Seq_key 并应用 pyspark.ml.clustering 库中的 k-means 算法。但这显然是低效的,因为seq_keys 的数量增加到数万。另外,我没有正确利用 spark 的分布式计算。

Seq_key 互斥,不能与其他Seq_keys 聚类 有没有办法通过ml 库中的groupBy 类似方法来实现我的输出? 即使只是计算由Seq_key 分组的质心就足够了。 这可能吗?

【问题讨论】:

【参考方案1】:

您也许可以通过水平并行来改进运行时间,即并行运行多个 Spark 作业,如下所示:

from multiprocessing.pool import ThreadPool
from multiprocessing import cpu_count

def run_kmeans(seqid, data=sens):

    df_tmp=data.filter(col('SEQ_ID')==seqid)\
        .select('SEQ_KEY','CLASS_ID','value')
    for c in df_tmp.columns:
        if c in FEATURE_COLS:
            df_tmp=df_tmp.withColumn(c, df_tmp[c].cast("float"))
    df_tmp=df_tmp.na.drop()
    vecAssembler = VectorAssembler(inputCols=FEATURE_COLS, outputCol="features")
    vector_df = vecAssembler.transform(df_tmp)
    bkm = BisectingKMeans().setK(4).setSeed(1).setFeaturesCol("features")
    model = bkm.fit(vector_df)
    cluster=model.transform(vector_df).drop('features')

    return cluster

pool = ThreadPool(cpu_count())
fleet_clusters = pool.map(run_kmeans, fleets)

【讨论】:

感谢您的建议!我会试试看。你能解释一下run_kmeans 函数中的df arg 是什么吗?我假设pool.map 函数接收列表fleets 的内容并对其内容并行运行run_kmeans 函数。那将是 seqid arg 作为输入。但是df 是什么? 抱歉,错字应该是df_tmp 另一个愚蠢的问题。 seqid 是我解决方案中 for 循环的迭代器。我还需要用你的方法运行一个 for 循环吗?此外,df_tmp 是在函数内部创建的虚拟数据框。我们是否应该将其作为输入 arg 传递给 run_kmeans 函数? 现在修复了错误,这能回答你上面的问题吗? 当我运行代码时出现错误:Can't pickle <function run_kmeans at 0x7f077ee30d40>: attribute lookup run_kmeans on __main__ failed 我的 spark 版本是 2.4.6【参考方案2】:

所以我实施了一个临时解决方案,从这个post 中获取想法。

我收集了一个不同的 Seq_keys 列表,然后手动循环遍历每个列表并应用 pyspark kmeans 方法,如下所示:

from pyspark.ml.clustering import BisectingKMeans
from pyspark.ml.feature import VectorAssembler    
fleets=list(sens.select('SEQ_KEY').distinct().toPandas()['SEQ_KEY'])
for seqid in fleets:
    df_tmp=sens.filter(col('SEQ_ID')==seqid)\
    .select('SEQ_KEY','CLASS_ID','value')
    for c in df_tmp.columns:
        if c in FEATURE_COLS:
            df_tmp=df_tmp.withColumn(c, df_tmp[c].cast("float"))
    df_tmp=df_tmp.na.drop()
    vecAssembler = VectorAssembler(inputCols=FEATURE_COLS, outputCol="features")
    vector_df = vecAssembler.transform(df_tmp)
    bkm = BisectingKMeans().setK(4).setSeed(1).setFeaturesCol("features")
    model = bkm.fit(vector_df)
    cluster=model.transform(vector_df).drop('features')
    fleet_clusters.append(cluster)

final_clustered_fleet=reduce(DataFrame.unionByName, fleet_clusters)

我暂时不考虑质心。只需获取集群信息就足够了。

这显然是肮脏和低效的。事实上,由于 kmeans 函数调用了 collect 方法,我的作业运行大约需要 8 个小时。我 90% 的工作节点处于空闲状态。 如果有一种更有效的方法来做到这一点,最好利用 spark 提供的多个工作节点,那就太好了。

【讨论】:

以上是关于如何在pyspark中运行多个k意味着集群和使用groupBy的主要内容,如果未能解决你的问题,请参考以下文章

PySpark任务在YARN集群上运行python 算法

PySpark任务在YARN集群上运行python 算法

Pyspark:K 表示模型拟合时的聚类误差

Pyspark 数据分布

寻找有关如何使用 python 启动 AWS EMR 集群以运行 pyspark 步骤的示例

如何在 Pyspark 中使用 @pandas_udf 返回多个数据帧?