多个小型 RDD 的高效联合
Posted
技术标签:
【中文标题】多个小型 RDD 的高效联合【英文标题】:Efficient union of multiple small RDDs 【发布时间】:2016-10-31 19:12:58 【问题描述】:我有一系列多个小文件 (~1-8KB),我想计算这些文件的字数。具体来说,我拥有的序列是files: Seq[String]
,其中序列的每个字符串都是每个文件的路径。我根据该序列计算总字数的方法是:
val totalWordCount = sc.union(
files.map(path => sc.textFile(path))
).flatMap(line => line.split(" "))
.map((_,1))
// I use a hash partitioner for better performance of reduceByKey
.partitionBy(new HashPartitioner(numPartitions))
.reduceByKey(_ + _)
我遇到的问题是,即使我有超过 10000 个小文件并使用上述技术,当我增加分区时,执行时间也会增加。这是为什么呢?
请注意,我不能从一开始就将这些小文件合并为一个,作为输入,我需要字符串序列。
【问题讨论】:
【参考方案1】:为什么慢
sc.textFile
没有针对这种情况进行优化。请记住,最佳分区大小约为 100 MB,而现在,您的 sc.union
RDD 为每个文件获取一个分区 -
您在问题中提到了“增加分区”,但我认为您可能希望减少分区的数量。我不确定numPartitions
来自哪里,但这应该是大约总数据大小/100MB。您的.partitionBy
步骤正在执行完全洗牌,因此原始的太多分区 RDD 仍然会有很多开销,但它可能会在下游执行得更好。
另一个可以尝试的执行模型
还有一些其他的尝试:联合上的无洗牌合并:
val optimalNPartitions = ??? // calculate total size / 100MB here
val totalWordCount = sc.union(files.map(path => sc.textFile(path)))
.flatMap(line => line.split(" "))
.coalesce(optimalNPartitions, shuffle = false) // try with shuf = true as well!
.map((_,1))
.reduceByKey(_ + _)
最后一点
虽然您说要分区到一个新的哈希分区器以提高 reduceByKey 的效率,但这实际上是错误的。
让我们看看这两个模型。首先,您拥有的那个:partitionBy
,然后是 reduceByKey
。分区步骤将对新的哈希分区器进行完全洗牌——所有数据都需要在网络中移动。当你调用 reduce 时,所有类似的键都已经在同一个地方,所以不需要发生 shuffle。
其次,省略partitionBy
,直接拨打reduceByKey
。在这个模型中,你进入reduce
没有分区,所以你必须洗牌。但是在你洗牌每个键之前,你将在本地减少 - 如果你在一个分区上使用“狗”这个词 100 次,你将洗牌 ("dog", 100)
而不是 ("dog", 1)
100 次。看看我要去哪里? Reduce 实际上只需要一个部分 shuffle,其大小由键的稀疏性决定(如果你只有几个唯一的键,很少会被 shuffle。如果一切都是唯一的,那么一切都会被 shuffle)。
显然模型 2 是我们想要的。摆脱那个partitionBy
!
【讨论】:
以上是关于多个小型 RDD 的高效联合的主要内容,如果未能解决你的问题,请参考以下文章
哪个是高效的,Dataframe 或 RDD 或 hiveql?
高效 告别996,开启java高效编程之门 3-7实战:常用中间操作演示之:过滤/映射/扁平化 filter/map/flagMap