Kmeans 聚类与火花中的地图减少
Posted
技术标签:
【中文标题】Kmeans 聚类与火花中的地图减少【英文标题】:Kmeans clustering with map reduce in spark 【发布时间】:2022-01-15 21:22:10 【问题描述】:您好,有人可以帮助我使用 Spark 使用 Kmeans 进行 map reduce。实际上可以用火花做 Kmeans,但我不知道如何映射和减少它。 谢谢。
【问题讨论】:
【参考方案1】:您不需要编写 map-reduce。您可以使用 spark 数据框 API 并使用 Spark ML 库。
您可以在此处阅读有关它的更多信息。
https://spark.apache.org/docs/latest/ml-clustering.html
【讨论】:
【参考方案2】:下面为您的练习建议的伪代码:
centroids = k 个数据集中的随机采样点
地图:
给定一个点和一组质心
计算点到每个质心的距离
发射点和最近的质心
减少:
给定质心和属于其集群的点
计算新的质心作为点的算术平均位置
发射新的质心
prev_centroids = 质心
质心 = new_centroids
而 prev_centroids - 质心 > 阈值
映射器类计算数据点和每个质心之间的距离。然后发出最近质心的索引和数据点:
class MAPPER
method MAP(file_offset, point)
min_distance = POSITIVE_INFINITY
closest_centroid = -1
for all centroid in list_of_centroids
distance = distance(centroid, point)
if (distance < min_distance)
closest_centroid = index_of(centroid)
min_distance = distance
EMIT(closest_centroid, point)
reducer 计算质心的新近似值并发出它。
class REDUCER
method REDUCER(centroid_index, list_of_point_sums)
number_of_points = partial_sum.number_of_points
point_sum = 0
for all partial_sum in list_of_partial_sums:
point_sum += partial_sum
point_sum.number_of_points += partial_sum.number_of_points
centroid_value = point_sum / point_sum.number_of_points
EMIT(centroid_index, centroid_value)
实际的 K-Means Spark 实现:
首先,您读取包含点的文件并使用随机采样生成初始质心,使用 takeSample(False, k):此函数从 RDD 中获取 k 个随机样本,无需替换;因此,应用程序以分布式方式生成初始质心,避免将所有数据移动到驱动程序。您可以在迭代算法中重用 RDD,因此使用 cache() 将其缓存在内存中,以避免每次触发操作时重新评估它:
points = sc.textFile(INPUT_PATH).map(Point).cache()
initial_centroids = init_centroids(points, k=parameters["k"])
def init_centroids(dataset, k):
start_time = time.time()
initial_centroids = dataset.takeSample(False, k)
print("init centroid execution:", len(initial_centroids), "in",
(time.time() - start_time), "s")
return initial_centroids
之后,您迭代 mapper 和 reducer 阶段,直到验证停止标准或达到最大迭代次数。
while True:
print("--Iteration n. itr:d".format(itr=n+1), end="\r",
flush=True)
cluster_assignment_rdd = points.map(assign_centroids)
sum_rdd = cluster_assignment_rdd.reduceByKey(lambda x, y: x.sum(y))
centroids_rdd = sum_rdd.mapValues(lambda x:
x.get_average_point()).sortByKey(ascending=True)
new_centroids = [item[1] for item in centroids_rdd.collect()]
stop = stopping_criterion(new_centroids,parameters["threshold"])
n += 1
if(stop == False and n < parameters["maxiteration"]):
centroids_broadcast = sc.broadcast(new_centroids)
else:
break
停止条件是这样计算的:
def stopping_criterion(new_centroids, threshold):
old_centroids = centroids_broadcast.value
for i in range(len(old_centroids)):
check = old_centroids[i].distance(new_centroids[i],
distance_broadcast.value) <= threshold
if check == False:
return False
return True
为了表示点,定义了一个Point类。它的特点是以下几个领域:
一个 numpyarray 组件 点数:一个点可以看成是很多点的聚合,所以这个变量用来跟踪物体所代表的点数包括以下操作:
距离(可以作为参数传递距离的类型)
总和
get_average_point:此方法返回一个点,该点以对象表示的点数的实际分量的平均值作为分量
类点: def init(自我,行): 值 = line.split(",") self.components = np.array([round(float(k), 5) for k in values]) self.number_of_points = 1
def sum(self, p):
self.components = np.add(self.components, p.components)
self.number_of_points += p.number_of_points
return self
def distance(self, p, h):
if (h < 0):
h = 2
return linalg.norm(self.components - p.components, h)
def get_average_point(self):
self.components = np.around(np.divide(self.components,
self.number_of_points), 5)
return self
在每次迭代时,在输入文件上调用映射器方法,该文件包含数据集中的点
cluster_assignment_rdd = points.map(assign_centroids)
assign_centroids 函数,对于被调用的每个点,将最近的质心分配给该点。质心取自广播变量。该函数将结果作为元组返回(质心的id,点)
def assign_centroids(p):
min_dist = float("inf")
centroids = centroids_broadcast.value
nearest_centroid = 0
for i in range(len(centroids)):
distance = p.distance(centroids[i], distance_broadcast.value)
if(distance < min_dist):
min_dist = distance
nearest_centroid = i
return (nearest_centroid, p)
reduce 阶段使用两个 spark 转换完成:
reduceByKey:对于每个集群,计算属于它的点的总和。必须将一个关联函数作为参数传递。关联函数(接受两个参数并返回单个元素)在数学性质上应该是可交换的和关联的
sum_rdd = cluster_assignment_rdd.reduceByKey(lambda x, y: x.sum(y))
mapValues:用于计算每个阶段结束时每个簇的平均点。这些点已经按键划分。这种转换仅适用于键的值。对结果进行排序以便于比较。
centroids_rdd = sum_rdd.mapValues(lambda x: x.get_average_point()).sortBy(lambda x: x[1].components[0])
get_average_point() 函数返回新计算的质心。
def get_average_point(self):
self.components = np.around(np.divide(self.components,
self.number_of_points), 5)
return self
【讨论】:
以上是关于Kmeans 聚类与火花中的地图减少的主要内容,如果未能解决你的问题,请参考以下文章