优化 Spark combineByKey

Posted

技术标签:

【中文标题】优化 Spark combineByKey【英文标题】:Optimizing Spark combineByKey 【发布时间】:2016-06-02 14:16:45 【问题描述】:

我正在尝试使用具有 4.5 tb 内存的集群来处理大约 2 tb 的数据集。数据采用镶木地板格式,最初加载到数据框中。然后查询数据的子集并将其转换为 RDD 以进行更复杂的处理。该处理的第一阶段是 mapToPair 使用每行 id 作为元组中的键。然后数据通过 combineByKey 操作对具有相同键的所有值进行分组。此操作总是超过最大集群内存,作业最终失败。虽然它正在洗牌,但有很多“内存映射溢出到磁盘”消息。我想知道我是否要对数据进行初始分区,以便所有具有相同 id 的行都驻留在同一个分区中,如果它需要进行左洗牌并正确执行。

要执行我正在使用的初始加载:

sqlContext.read().parquet(inputPathArray).repartition(10000, new Column("id"));

我不确定这是否是分区数据帧的正确方法,所以我的第一个问题是上述正确的。

我的下一个问题是,当我从数据框转到 rdd 时,使用:

JavaRDD<LocationRecord> locationsForSpecificKey = sqlc.sql("SELECT * FROM standardlocationrecords WHERE customerID = " + customerID + " AND partnerAppID = " + partnerAppID)
                    .toJavaRDD().map(new LocationRecordFromRow()::apply);

是保留数据帧的分区方案,还是在使用 mapToPair 后需要重新分区:

rdd.partitionBy 并传入一个使用 ID 字段哈希的自定义 HashPartitioner。

我的目标是在执行最终 combineByKey 时减少洗牌,以防止作业内存不足和失败。任何帮助将不胜感激。

谢谢, 内森

【问题讨论】:

【参考方案1】:

我不确定这是否是分区数据帧的正确方法

看起来差不多。

是保留的数据帧中的分区方案

应保留数据分布,通过查看debugString 可以轻松检查:

val df = sqlContext.read.parquet("/tmp/foo").repartition(10000, $"id")

df.rdd.toDebugString
// String =
// (10000) MapPartitionsRDD[46] at rdd at <console>:26 []
//    |    ShuffledRowRDD[45] at rdd at <console>:26 []
//    +-(8) MapPartitionsRDD[44] at rdd at <console>:26 []
//       |  $anon$1[43] at  []

但没有为输出 RDD 设置分区器:

df.rdd.partitioner
// Option[org.apache.spark.Partitioner] = None

因此此信息不能用于优化后续聚合。

我的目标是减少洗牌

如果是这样,它看起来不是一个正确的方法。假设传递给combineByKeymergeValue 函数是一个归约操作,您实际上比直接应用combineByKey 更容易洗牌。如果不是这种情况,那么应用 combineByKey 并将 mapSideCombine 设置为 false 可能是更好的选择。

根据组合逻辑,您还应该考虑直接在DataFrame 上执行聚合。

【讨论】:

如果是这样,请参阅下面的讨论 ***.com/a/37580350/1560062 和 ***.com/q/37189802/1560062 需要注意的是,combineByKey 的输出类型与输入类型不同,所以我不确定是否可以使用 groupByKey。 如果您认为这是必需的,我会尝试DataFrame.repartition 并将combineByKeymapSideCombine=false 一起使用。如果单个(键,值)对仍然太大而无法放入内存,那么最终合并仍然会失败。

以上是关于优化 Spark combineByKey的主要内容,如果未能解决你的问题,请参考以下文章

spark学习之作业优化

万字宝典 |《 Spark性能优化全书》推荐收藏!

Spark 优化 | 图文理解 Spark 3.0 的动态分区裁剪优化

spark是如何改进优化hadoop框架的

spark 调度优化

spark优化