多个小型 RDD 的高效联合

Posted

技术标签:

【中文标题】多个小型 RDD 的高效联合【英文标题】:Efficient union of multiple small RDDs 【发布时间】:2016-10-31 19:12:58 【问题描述】:

我有一系列多个小文件 (~1-8KB),我想计算这些文件的字数。具体来说,我拥有的序列是files: Seq[String],其中序列的每个字符串都是每个文件的路径。我根据该序列计算总字数的方法是:

val totalWordCount = sc.union(
      files.map(path => sc.textFile(path))
    ).flatMap(line => line.split(" "))
      .map((_,1))
      // I use a hash partitioner for better performance of reduceByKey
      .partitionBy(new HashPartitioner(numPartitions))
      .reduceByKey(_ + _)

我遇到的问题是,即使我有超过 10000 个小文件并使用上述技术,当我增加分区时,执行时间也会增加。这是为什么呢?

请注意,我不能从一开始就将这些小文件合并为一个,作为输入,我需要字符串序列。

【问题讨论】:

【参考方案1】:

为什么慢

sc.textFile 没有针对这种情况进行优化。请记住,最佳分区大小约为 100 MB,而现在,您的 sc.union RDD 为每个文件获取一个分区 -

您在问题中提到了“增加分区”,但我认为您可能希望减少分区的数量。我不确定numPartitions 来自哪里,但这应该是大约总数据大小/100MB。您的.partitionBy 步骤正在执行完全洗牌,因此原始的太多分区 RDD 仍然会有很多开销,但它可能会在下游执行得更好。

另一个可以尝试的执行模型

还有一些其他的尝试:联合上的无洗牌合并:

val optimalNPartitions = ??? // calculate total size / 100MB here
val totalWordCount = sc.union(files.map(path => sc.textFile(path)))
  .flatMap(line => line.split(" "))
  .coalesce(optimalNPartitions, shuffle = false) // try with shuf = true as well!
  .map((_,1))
  .reduceByKey(_ + _)

最后一点

虽然您说要分区到一个新的哈希分区器以提高 reduceByKey 的效率,但这实际上是错误的。

让我们看看这两个模型。首先,您拥有的那个:partitionBy,然后是 reduceByKey。分区步骤将对新的哈希分区器进行完全洗牌——所有数据都需要在网络中移动。当你调用 reduce 时,所有类似的键都已经在同一个地方,所以不需要发生 shuffle。

其次,省略partitionBy,直接拨打reduceByKey。在这个模型中,你进入reduce 没有分区,所以你必须洗牌。但是在你洗牌每个键之前,你将在本地减少 - 如果你在一个分区上使用“狗”这个词 100 次,你将洗牌 ("dog", 100) 而不是 ("dog", 1) 100 次。看看我要去哪里? Reduce 实际上只需要一个部分 shuffle,其大小由键的稀疏性决定(如果你只有几个唯一的键,很少会被 shuffle。如果一切都是唯一的,那么一切都会被 shuffle)。

显然模型 2 是我们想要的。摆脱那个partitionBy

【讨论】:

以上是关于多个小型 RDD 的高效联合的主要内容,如果未能解决你的问题,请参考以下文章

哪个是高效的,Dataframe 或 RDD 或 hiveql?

Spark高效数据分析04RDD创建

高效 告别996,开启java高效编程之门 3-7实战:常用中间操作演示之:过滤/映射/扁平化 filter/map/flagMap

Spark RDD理解

对包含对象的大量小型 .mat 文件进行高效磁盘访问

Spark RDD 核心总结