Kmeans 聚类与火花中的地图减少

Posted

技术标签:

【中文标题】Kmeans 聚类与火花中的地图减少【英文标题】:Kmeans clustering with map reduce in spark 【发布时间】:2022-01-15 21:22:10 【问题描述】:

您好,有人可以帮助我使用 Spark 使用 Kmeans 进行 map reduce。实际上可以用火花做 Kmeans,但我不知道如何映射和减少它。 谢谢。

【问题讨论】:

【参考方案1】:

您不需要编写 map-reduce。您可以使用 spark 数据框 API 并使用 Spark ML 库。

您可以在此处阅读有关它的更多信息。

https://spark.apache.org/docs/latest/ml-clustering.html

【讨论】:

【参考方案2】:

下面为您的练习建议的伪代码:

centroids = k 个数据集中的随机采样点

地图:

给定一个点和一组质心

计算点到每个质心的距离

发射点和最近的质心

减少:

给定质心和属于其集群的点

计算新的质心作为点的算术平均位置

发射新的质心

prev_centroids = 质心

质心 = new_centroids

而 prev_centroids - 质心 > 阈值

映射器类计算数据点和每个质心之间的距离。然后发出最近质心的索引和数据点:

class MAPPER
method MAP(file_offset, point)
    min_distance = POSITIVE_INFINITY
    closest_centroid = -1
    for all centroid in list_of_centroids
        distance = distance(centroid, point)
        if (distance < min_distance)
            closest_centroid = index_of(centroid)
            min_distance = distance
    EMIT(closest_centroid, point) 

reducer 计算质心的新近似值并发出它。

class REDUCER
method REDUCER(centroid_index, list_of_point_sums)
    number_of_points = partial_sum.number_of_points
    point_sum = 0
    for all partial_sum in list_of_partial_sums:
        point_sum += partial_sum
        point_sum.number_of_points += partial_sum.number_of_points
    centroid_value = point_sum / point_sum.number_of_points
    EMIT(centroid_index, centroid_value)

实际的 K-Means Spark 实现:

首先,您读取包含点的文件并使用随机采样生成初始质心,使用 takeSample(False, k):此函数从 RDD 中获取 k 个随机样本,无需替换;因此,应用程序以分布式方式生成初始质心,避免将所有数据移动到驱动程序。您可以在迭代算法中重用 RDD,因此使用 cache() 将其缓存在内存中,以避免每次触发操作时重新评估它:

points = sc.textFile(INPUT_PATH).map(Point).cache()
initial_centroids = init_centroids(points, k=parameters["k"])

def init_centroids(dataset, k):
    start_time = time.time()
    initial_centroids = dataset.takeSample(False, k)
    print("init centroid execution:", len(initial_centroids), "in", 
    (time.time() - start_time), "s")
    return initial_centroids

之后,您迭代 mapper 和 reducer 阶段,直到验证停止标准或达到最大迭代次数。

while True:
    print("--Iteration n. itr:d".format(itr=n+1), end="\r", 
    flush=True)
    cluster_assignment_rdd = points.map(assign_centroids)
    sum_rdd = cluster_assignment_rdd.reduceByKey(lambda x, y: x.sum(y))
    centroids_rdd = sum_rdd.mapValues(lambda x: 
    x.get_average_point()).sortByKey(ascending=True)

    new_centroids = [item[1] for item in centroids_rdd.collect()]
    stop = stopping_criterion(new_centroids,parameters["threshold"])

    n += 1
    if(stop == False and n < parameters["maxiteration"]):
        centroids_broadcast = sc.broadcast(new_centroids)
    else:
        break

停止条件是这样计算的:

def stopping_criterion(new_centroids, threshold):
    old_centroids = centroids_broadcast.value
    for i in range(len(old_centroids)):
        check = old_centroids[i].distance(new_centroids[i], 
        distance_broadcast.value) <= threshold
        if check == False:
            return False
    return True

为了表示点,定义了一个Point类。它的特点是以下几个领域:

一个 numpyarray 组件 点数:一个点可以看成是很多点的聚合,所以这个变量用来跟踪物体所代表的点数

包括以下操作:

距离(可以作为参数传递距离的类型)

总和

get_average_point:此方法返回一个点,该点以对象表示的点数的实际分量的平均值作为分量

类点: def init(自我,行): 值 = line.split(",") self.components = np.array([round(float(k), 5) for k in values]) self.number_of_points = 1

  def sum(self, p):
      self.components = np.add(self.components, p.components)
      self.number_of_points += p.number_of_points
      return self

  def distance(self, p, h):
      if (h < 0):
         h = 2
      return linalg.norm(self.components - p.components, h)

  def get_average_point(self):
      self.components = np.around(np.divide(self.components, 
      self.number_of_points), 5)
      return self

在每次迭代时,在输入文件上调用映射器方法,该文件包含数据集中的点

cluster_assignment_rdd = points.map(assign_centroids)

assign_centroids 函数,对于被调用的每个点,将最近的质心分配给该点。质心取自广播变量。该函数将结果作为元组返回(质心的id,点)

 def assign_centroids(p):
     min_dist = float("inf")
     centroids = centroids_broadcast.value
     nearest_centroid = 0
     for i in range(len(centroids)):
         distance = p.distance(centroids[i], distance_broadcast.value)
         if(distance < min_dist):
             min_dist = distance
             nearest_centroid = i
     return (nearest_centroid, p)

reduce 阶段使用两个 spark 转换完成:

reduceByKey:对于每个集群,计算属于它的点的总和。必须将一个关联函数作为参数传递。关联函数(接受两个参数并返回单个元素)在数学性质上应该是可交换的和关联的

sum_rdd = cluster_assignment_rdd.reduceByKey(lambda x, y: x.sum(y))

mapValues:用于计算每个阶段结束时每个簇的平均点。这些点已经按键划分。这种转换仅适用于键的值。对结果进行排序以便于比较。

centroids_rdd = sum_rdd.mapValues(lambda x: x.get_average_point()).sortBy(lambda x: x[1].components[0])

get_average_point() 函数返回新计算的质心。

 def get_average_point(self):
     self.components = np.around(np.divide(self.components, 
     self.number_of_points), 5)
     return self

【讨论】:

以上是关于Kmeans 聚类与火花中的地图减少的主要内容,如果未能解决你的问题,请参考以下文章

谱聚类为啥要用到kmeans

聚类算法-Kmeans算法的简单实现

聚类算法--KMeans

如何评估 R 中的 kmeans 聚类性能

使用 R 中的 wordcloud 从聚类向量中显示单个 kmeans 聚类

K-means 与KNN 聚类算法