在 Pyspark 中实现 K-medoids

Posted

技术标签:

【中文标题】在 Pyspark 中实现 K-medoids【英文标题】:Implementing K-medoids in Pyspark 【发布时间】:2019-04-26 13:05:26 【问题描述】:

我在 Pyspark 中找不到使用 PAM(K-medoids)的库。

我在 Scala 中找到了这个: https://gist.github.com/erikerlandson/c3c35f0b1aae737fc884

Spark 中的这个问题已于 2016 年解决: https://issues.apache.org/jira/browse/SPARK-4510https://github.com/apache/spark/pull/3382

但它似乎不起作用,并且这不包含在 mllib 文档中: http://spark.apache.org/docs/2.0.0/api/python/pyspark.mllib.html#module-pyspark.mllib.clustering

有人知道 Pyspark 中有任何 PAM 库吗?

谢谢

【问题讨论】:

【参考方案1】:

前几天我真的去玩了。不能对性能说太多,因为我对火花很陌生。但这里是带有 K++ 种子的 KMedoids:

# (c) 2020 Jonathan Kelsey
# This code is licensed under MIT license
from pyspark.sql import functions as F
import pyspark
import numpy as np
import sys

def seedKernel(dataB, dataIdValue, centeroids, k, metric):
    data = dataB.value
    point = dataIdValue[1]
    minD = sys.maxsize 
    for j in range(len(centeroids)): 
        distance = metric(point, data[centeroids[j]]) 
        minD = min(minD, distance) 
    return int(minD)

def seedClusters(dataB, dataFrame, k, metric):
    data = dataB.value
    centeroids = list(np.random.choice(data.shape[0], 1, replace=False))
    for i in range(k - 1):
        print("clusterSeed", i)
        distances = []
        mK = dataFrame.rdd.map(lambda dataIdValue: seedKernel(dataB, dataIdValue, centeroids, k, metric))
        mK_collect = mK.collect()
        distances = np.array(mK_collect) 
        nextCenteroid = np.argmax(distances)
        centeroids.append(nextCenteroid) 
    print(centeroids)
    return centeroids 

def nearestCenteroidKernel(dataIdValue, centeroidIdValues, metric):
    dataId, dataValue = dataIdValue
    dataNp = np.asarray(dataValue)
    distances = []
    for centeroidId, centeroidValue in centeroidIdValues:
        centeroidNp = np.asarray(centeroidValue)
        distance = metric(dataNp, centeroidNp)
        distances.append(distance)
    distances = np.asarray(distances)
    closestCenteroid = np.argmin(distances)
    return int(closestCenteroid)

def optimiseClusterMembershipSpark(data, dataFrame, n, metric, intitalClusterIndices=None):
    dataShape = data.shape
    dataRDD = dataFrame.rdd
    lengthOfData = dataShape[0]
    if intitalClusterIndices is None:
        index = np.random.choice(lengthOfData, n, replace=False)
    else:
        index = intitalClusterIndices
    listIndex = [int(i) for i in list(index)]
    centeroidIdValues = [(i,data[index[i]]) for i in range(len(index))]
    dataRDD = dataRDD.filter(lambda dataIdValue: int(dataIdValue["id"]) not in listIndex)
    associatedClusterPoints = dataRDD.map(lambda dataIdValue: (dataIdValue[0],nearestCenteroidKernel(dataIdValue, centeroidIdValues, metric)))
    clusters = associatedClusterPoints.toDF(["id", "bestC"]).groupBy("bestC").agg(F.collect_list("id").alias("cluster"))
    return index, clusters

def costKernel(dataB, testCenteroid, clusterData, metric):
    data = dataB.value
    cluster = np.asarray(clusterData)
    lenCluster = cluster.shape[0]
    lenFeature = data.shape[1]
    testCenteroidColumn = np.zeros(shape=(lenCluster, lenFeature), dtype=data.dtype)
    newClusterColumn = np.zeros(shape=(lenCluster, lenFeature), dtype=data.dtype)
    for i in range(0, lenCluster):
        newClusterColumn[i] = data[cluster[i]]
        testCenteroidColumn[i] = data[int(testCenteroid)] 
    pairwiseDistance =  metric(newClusterColumn, testCenteroidColumn)# (np.absolute(newClusterColumn-testCenteroidColumn).sum(axis=1))# metric(newClusterColumn, testCenteroidColumn)
    cost = np.sum(pairwiseDistance)
    return float(cost) #newClusterColumn.shape[1]

def optimiseCentroidSelectionSpark(dataB, dataFrame, centeroids, clustersFrames, metric):
    data = dataB.value
    dataRDD = dataFrame.rdd
    dataShape = data.shape
    newCenteroidIds = []
    totalCost = 0
    for clusterIdx in range(len(centeroids)):
        print("clusterOpIdx", clusterIdx)
        oldCenteroid = centeroids[clusterIdx]
        clusterFrame = clustersFrames.filter(clustersFrames.bestC == clusterIdx).select(F.explode(clustersFrames.cluster))
        clusterData = clusterFrame.collect()
        if clusterData:
            clusterData = [clusterData[i].col for i in range(len(clusterData))]
        else:
            clusterData = []
        cluster = np.asarray(clusterData)
        costData = clusterFrame.rdd.map(lambda pointId: (pointId[0], costKernel(dataB, pointId[0], clusterData, metric)))
        #print(costData.toDF().show())
        cost = costData.map(lambda pointIdCost: pointIdCost[1]).sum()
        totalCost = totalCost + cost
        pointResult = costData.sortBy(lambda pointId_Cost: pointId_Cost[1]).take(1)
        if (pointResult):
            bestPoint = pointResult[0][0]
        else:
            bestPoint = oldCenteroid
        newCenteroidIds.append(bestPoint)
    return (newCenteroidIds, totalCost)

#vector metrics
def hammingVector(stack1, stack2):
    return (stack1 != stack2).sum(axis=1)
def euclideanVector(stack1, stack2):
    return (np.absolute(stack2-stack1)).sum(axis=1)
# point metrics
def euclideanPoint(p1, p2): 
    return np.sum((p1 - p2)**2) 
def hammingPoint(p1, p2): 
    return np.sum((p1 != p2))

def fit(sc, data, nRegions = 2, metric = "euclidean", seeding = "heuristic"):
    if metric == "euclidean":
        pointMetric = euclideanPoint
        vectorMetric = euclideanVector
    elif metric == "hamming":
        pointMetric = hammingPoint
        vectorMetric = hammingVector
    else:
        print("unsuported metric")
        return

    dataN = np.asarray(data)
    dataB = sc.broadcast(dataN)
    seeds = None
    dataFrame  = sc.parallelize(data).zipWithIndex().map(lambda xy: (xy[1],xy[0])).toDF(["id", "vector"]).cache()
    if (seeding == "heuristic"):
        seeds = list(seedClusters(dataB, dataFrame, nRegions, pointMetric))
    lastCenteroids, lastClusters = optimiseClusterMembershipSpark(dataN, dataFrame, nRegions, pointMetric, seeds)
    lastCost = float('inf')
    iteration = 0
    escape = False
    while not escape:
        iteration = iteration + 1
        currentCenteroids, currentCost = optimiseCentroidSelectionSpark(dataB, dataFrame, lastCenteroids, lastClusters, vectorMetric)
        currentCenteroids, currentClusters = optimiseClusterMembershipSpark(dataN, dataFrame, nRegions, pointMetric, currentCenteroids)
        print((currentCost<lastCost, currentCost, lastCost, currentCost - lastCost))
        if (currentCost<lastCost):
            print(("iteration",iteration,"cost improving...", currentCost, lastCost))
            lastCost = currentCost
            lastCenteroids = currentCenteroids
            lastClusters = currentClusters
        else:
            print(("iteration",iteration,"cost got worse or did not improve", currentCost, lastCost))
            escape = True
    bc = lastClusters.collect()
    unpackedClusters = [bc[i].cluster for i in range(len(bc))]
    return (lastCenteroids, unpackedClusters)

我使用 pyclustering 中的一些示例数据作为完整性检查:

from pyclustering.cluster import cluster_visualizer
from pyclustering.utils import read_sample
from pyclustering.samples.definitions import FCPS_SAMPLES
from pyclustering.samples.definitions import SIMPLE_SAMPLES
sample = read_sample(FCPS_SAMPLES.SAMPLE_GOLF_BALL)
bestCentroids, bestClusters = fit(sc, sample, 9)
visualizer = cluster_visualizer()
visualizer.append_clusters(bestClusters, sample)
visualizer.show()

【讨论】:

【参考方案2】:

您最好的选择是将这个 Python 实现适配到 Scala 中,这样您就可以提前使用 RDD 分区和分布式计算。 https://github.com/letiantian/kmedoids/blob/master/kmedoids.py

【讨论】:

以上是关于在 Pyspark 中实现 K-medoids的主要内容,如果未能解决你的问题,请参考以下文章

在 Pyspark 中实现 K-medoids

在 pyspark 中实现类不平衡算法

应用转换或连接条件以在 pyspark 或 hive 中实现结果

在 spark 中实现 informatica 逻辑

DATEADD 的 Pyspark 实现

重构pyspark数据框