如何在pyspark中运行多个k意味着集群和使用groupBy
Posted
技术标签:
【中文标题】如何在pyspark中运行多个k意味着集群和使用groupBy【英文标题】:How to run multiple k means clustering and use groupBy in pyspark 【发布时间】:2021-06-01 17:38:03 【问题描述】:我有一个这样的数据集:
|Seq_key| |Class_id| |value|
Seq_key 1 Class_id 1 value 1
Seq_key 1 Class_id 2 value 2
Seq_key 1 Class_id 3 value 3
Seq_key 1 Class_id 4 value 4
Seq_key 1 Class_id 5 value 5
Seq_key 1 Class_id 6 value 6
Seq_key 2 Class_id 1 value 1
Seq_key 2 Class_id 2 value 2
Seq_key 2 Class_id 3 value 3
Seq_key 2 Class_id 4 value 4
Seq_key 2 Class_id 5 value 5
Seq_key 2 Class_id 6 value 6
Seq_key 2 Class_id 7 value 7
Seq_key 3 Class_id 1 value 1
Seq_key 3 Class_id 2 value 2
Seq_key 3 Class_id 3 value 3
Seq_key 3 Class_id 4 value 4
Seq_key 3 Class_id 5 value 5
Seq_key 3 Class_id 6 value 6
Seq_key 3 Class_id 7 value 7
Seq_key 3 Class_id 8 value 8
每个Seq_key
的Class_ids
和values
是互斥的。
我对每个 Seq_key
应用 k-means 聚类,并找到最佳数量的聚类、质心等,以便每个 Seq_key
的输出如下所示:
|Seq_key| |Class id| |Cluster| |Centroid|
Seq_key 1 Class_id 1 1 128
Seq_key 1 Class_id 2 2 56
Seq_key 1 Class_id 3 3 100
Seq_key 1 Class_id 4 1 128
Seq_key 1 Class_id 5 1 128
Seq_key 1 Class_id 6 4 72
Seq_key 2 Class_id 1 1 5.5
Seq_key 2 Class_id 2 1 5.5
Seq_key 2 Class_id 3 2 3.4
Seq_key 2 Class_id 4 3 1.7
Seq_key 2 Class_id 5 1 5.5
Seq_key 2 Class_id 6 2 3.4
Seq_key 2 Class_id 7 2 3.4
Seq_key 3 Class_id 1 4 500
Seq_key 3 Class_id 2 1 700
Seq_key 3 Class_id 3 3 274
Seq_key 3 Class_id 4 2 189
Seq_key 3 Class_id 5 2 189
Seq_key 3 Class_id 6 4 500
Seq_key 3 Class_id 7 1 700
Seq_key 3 Class_id 8 3 274
目前,我手动循环遍历每个 Seq_key
并应用 pyspark.ml.clustering
库中的 k-means 算法。但这显然是低效的,因为seq_keys
的数量增加到数万。另外,我没有正确利用 spark 的分布式计算。
Seq_key
互斥,不能与其他Seq_keys
聚类
有没有办法通过ml
库中的groupBy
类似方法来实现我的输出?
即使只是计算由Seq_key
分组的质心就足够了。
这可能吗?
【问题讨论】:
【参考方案1】:您也许可以通过水平并行来改进运行时间,即并行运行多个 Spark 作业,如下所示:
from multiprocessing.pool import ThreadPool
from multiprocessing import cpu_count
def run_kmeans(seqid, data=sens):
df_tmp=data.filter(col('SEQ_ID')==seqid)\
.select('SEQ_KEY','CLASS_ID','value')
for c in df_tmp.columns:
if c in FEATURE_COLS:
df_tmp=df_tmp.withColumn(c, df_tmp[c].cast("float"))
df_tmp=df_tmp.na.drop()
vecAssembler = VectorAssembler(inputCols=FEATURE_COLS, outputCol="features")
vector_df = vecAssembler.transform(df_tmp)
bkm = BisectingKMeans().setK(4).setSeed(1).setFeaturesCol("features")
model = bkm.fit(vector_df)
cluster=model.transform(vector_df).drop('features')
return cluster
pool = ThreadPool(cpu_count())
fleet_clusters = pool.map(run_kmeans, fleets)
【讨论】:
感谢您的建议!我会试试看。你能解释一下run_kmeans
函数中的df
arg 是什么吗?我假设pool.map
函数接收列表fleets
的内容并对其内容并行运行run_kmeans
函数。那将是 seqid
arg 作为输入。但是df
是什么?
抱歉,错字应该是df_tmp
另一个愚蠢的问题。 seqid
是我解决方案中 for 循环的迭代器。我还需要用你的方法运行一个 for 循环吗?此外,df_tmp
是在函数内部创建的虚拟数据框。我们是否应该将其作为输入 arg 传递给 run_kmeans
函数?
现在修复了错误,这能回答你上面的问题吗?
当我运行代码时出现错误:Can't pickle <function run_kmeans at 0x7f077ee30d40>: attribute lookup run_kmeans on __main__ failed
我的 spark 版本是 2.4.6【参考方案2】:
所以我实施了一个临时解决方案,从这个post 中获取想法。
我收集了一个不同的 Seq_keys
列表,然后手动循环遍历每个列表并应用 pyspark kmeans 方法,如下所示:
from pyspark.ml.clustering import BisectingKMeans
from pyspark.ml.feature import VectorAssembler
fleets=list(sens.select('SEQ_KEY').distinct().toPandas()['SEQ_KEY'])
for seqid in fleets:
df_tmp=sens.filter(col('SEQ_ID')==seqid)\
.select('SEQ_KEY','CLASS_ID','value')
for c in df_tmp.columns:
if c in FEATURE_COLS:
df_tmp=df_tmp.withColumn(c, df_tmp[c].cast("float"))
df_tmp=df_tmp.na.drop()
vecAssembler = VectorAssembler(inputCols=FEATURE_COLS, outputCol="features")
vector_df = vecAssembler.transform(df_tmp)
bkm = BisectingKMeans().setK(4).setSeed(1).setFeaturesCol("features")
model = bkm.fit(vector_df)
cluster=model.transform(vector_df).drop('features')
fleet_clusters.append(cluster)
final_clustered_fleet=reduce(DataFrame.unionByName, fleet_clusters)
我暂时不考虑质心。只需获取集群信息就足够了。
这显然是肮脏和低效的。事实上,由于 kmeans 函数调用了 collect
方法,我的作业运行大约需要 8 个小时。我 90% 的工作节点处于空闲状态。
如果有一种更有效的方法来做到这一点,最好利用 spark 提供的多个工作节点,那就太好了。
【讨论】:
以上是关于如何在pyspark中运行多个k意味着集群和使用groupBy的主要内容,如果未能解决你的问题,请参考以下文章