工作人员之间的平衡 RDD 分区 - Spark
Posted
技术标签:
【中文标题】工作人员之间的平衡 RDD 分区 - Spark【英文标题】:Balanced RDD partition among workers - Spark 【发布时间】:2017-08-17 16:40:45 【问题描述】:我正在使用 RDD
中的 x: key, y: set(values)
称为 file
。
#values: RDD of tuples (key, val)
file = values.groupByKey().mapValues(set).cache()
info_file = array(file.map(lambda (x,y): len(y)).collect())
var = np.var(info_file) #extremely high
def f():
...
file.foreachPartition(f)
len(y)
的方差非常高,因此大约 1% 的对集合(使用百分位数方法验证)占集合 total = np.sum(info_file)
中值总数的 20%。
如果 Spark 使用 shuffle 随机分区,那么这 1% 很可能会落在同一个分区中,从而导致工作人员之间的负载不平衡。
有没有办法确保“重”元组在分区之间均匀分布?
实际上,我根据threshold = np.percentile(info_file,99.9)
给出的阈值len(y)
将file
拆分为两个分区heavy
和light
,以便分离这组元组然后重新分区。
light = file.filter(lambda (x,y): len(y) < threshold).cache()
heavy = file.filter(lambda (x,y): len(y) >= threshold).cache()
light.foreachPartition(f)
heavy.foreachPartition(f)
但获得几乎相同的运行时间。负载可能已经优化,只是想检查我是否可以做更多/更好的事情。
【问题讨论】:
【参考方案1】:您可以使用 Ganglia 来监控集群负载。这应该可以很好地指示任何可能导致集群负载不均的数据倾斜。
如果您确实遇到了不幸的数据倾斜,可以通过重组数据或加盐键等方式与之抗争。参见例如this *** Q&A。
请注意,您现在也可以将数据拆分为heavy
分区和light
分区,但在这种情况下,您希望cache
和file
- 而不是heavy
和light
- 因为它是file
,你需要处理多次。像这样:
cachedFile = file.cache()
light = cachedFile.filter(lambda (x,y): len(y) < threshold)
heavy = cachedFile.filter(lambda (x,y): len(y) >= threshold)
light.foreachPartition(f)
heavy.foreachPartition(f)
希望对您有所帮助。
【讨论】:
以上是关于工作人员之间的平衡 RDD 分区 - Spark的主要内容,如果未能解决你的问题,请参考以下文章