Pyspark:在数据帧的不同组上应用 kmeans

Posted

技术标签:

【中文标题】Pyspark:在数据帧的不同组上应用 kmeans【英文标题】:Pyspark: applying kmeans on different groups of a dataframe 【发布时间】:2018-04-23 18:47:31 【问题描述】:

使用 Pyspark 我想将 kmeans 分别应用于数据帧组,而不是一次应用于整个数据帧。目前,我使用了一个 for 循环,该循环在每个组上进行迭代,应用 kmeans 并将结果附加到另一个表中。但是有很多组会很耗时。任何人都可以帮助我吗? 非常感谢!

for customer in customer_list:
    temp_df = togroup.filter(col("customer_id")==customer)
    df = assembler.transform(temp_df)
    k = 1
    while (k < 5 & mtrc < width):
        k += 1
        kmeans = KMeans(k=k,seed=5,maxIter=20,initSteps=5)
        model = kmeans.fit(df)
        mtric = 1 - model.computeCost(df)/ttvar
        a = model.transform(df)select(cols)
        allcustomers = allcustomers .union(a)

【问题讨论】:

请将您正在使用的代码附加到问题中。 这不是一个愚蠢的问题,所以我不明白反对意见。我从未尝试过集群,但似乎窗口函数可能在这里工作。 【参考方案1】:

我想出了第二个解决方案,我认为它比上一个解决方案略好。这个想法是使用groupby()collect_list() 并编写一个将列表作为输入并生成集群的udf。在我们编写的另一个解决方案中继续使用df_spark

df_flat = df_spark.groupby('cat').agg(F.collect_list('val').alias('val_list'))

现在我们编写 udf 函数:

import numpy as np
import pyspark.sql.functions as F
from sklearn.cluster import KMeans
from pyspark.sql.types import *
def skmean(x):
    kmeans = KMeans(n_clusters=2, random_state=0)
    X = np.array(x).reshape(-1,1)  
    kmeans.fit(X)
    clusters = kmeans.predict(X).tolist()
    return(clusters)
clustering_udf = F.udf(lambda arr : skmean(arr), ArrayType(IntegerType()))

然后将 udf 应用到展平的数据框:

df = df_flat.withColumn('clusters', clustering_udf(F.col('val')))

然后你可以使用F.explode()将列表转换为列。

【讨论】:

不应该是df = df_flat.withColumn('clusters', clustering_udf(F.col('val_list')))吗? @pmjn6 至少在您之前的回答中,如果安装了 pyarrow,那么 pandas 方法可能会稍微快一些。随着数据集大小的增加,这种方法会很快变得昂贵。我认为 Eva 的原始方法可能是最好的,因为它完全在 pyspark 中完成。虽然 for 循环笨重且效率低下。【参考方案2】:

我想出了一个使用 pandas_udf 的解决方案。首选纯 spark 或 scala 解决方案,但尚未提供。 假设我的数据是

import pandas as pd
df_pd = pd.DataFrame([['cat1',10.],['cat1',20.],['cat1',11.],['cat1',21.],['cat1',22.],['cat1',9.],['cat2',101.],['cat2',201.],['cat2',111.],['cat2',214.],['cat2',224.],['cat2',99.]],columns=['cat','val'])
df_sprk = spark.createDataFrame(df_pd)

先解决pandas中的问题:

from sklearn.cluster import KMeans
kmeans = KMeans(n_clusters=2,random_state=0)

def skmean(kmeans,x):
    X = np.array(x)
    kmeans.fit(X)
    return(kmeans.predict(X))

您可以将 skmean() 应用于熊猫数据框(以确保其正常工作):

df_pd.groupby('cat').apply(lambda x:skmean(kmeans,x)).reset_index()

要将函数应用于 pyspark 数据帧,我们使用 pandas_udf。但首先为输出数据框定义一个模式:

from pyspark.sql.types import *
schema = StructType(
       [StructField('cat',StringType(),True),
        StructField('clusters',ArrayType(IntegerType()))])

将上面的函数转换为pandas_udf:

from pyspark.sql.functions import pandas_udf
from pyspark.sql.functions import PandasUDFType  

@pandas_udf(schema, functionType=PandasUDFType.GROUPED_MAP)
def skmean_udf(df):
    result = pd.DataFrame(
             df.groupby('cat').apply(lambda x: skmean(kmeans,x))
    result.reset_index(inplace=True, drop=False)
    return(result)

您可以按如下方式使用该功能:

df_spark.groupby('cat').apply(skmean_udf).show()

【讨论】:

问题中发布的纯 pyspark 解决方案真的更快吗?

以上是关于Pyspark:在数据帧的不同组上应用 kmeans的主要内容,如果未能解决你的问题,请参考以下文章

为啥同组聚类数据点在 Kmeans 聚类中落得较远或分散?

在 PySpark 中运行 KMeans 聚类

PySpark 中的 KMeans 聚类

应用模型后Pyspark提取转换数据帧的概率[重复]

如何按行将函数应用于 PySpark 数据帧的一组列?

加入两个 Pyspark 数据帧的两种方法有啥区别