优化 Spark combineByKey
Posted
技术标签:
【中文标题】优化 Spark combineByKey【英文标题】:Optimizing Spark combineByKey 【发布时间】:2016-06-02 14:16:45 【问题描述】:我正在尝试使用具有 4.5 tb 内存的集群来处理大约 2 tb 的数据集。数据采用镶木地板格式,最初加载到数据框中。然后查询数据的子集并将其转换为 RDD 以进行更复杂的处理。该处理的第一阶段是 mapToPair 使用每行 id 作为元组中的键。然后数据通过 combineByKey 操作对具有相同键的所有值进行分组。此操作总是超过最大集群内存,作业最终失败。虽然它正在洗牌,但有很多“内存映射溢出到磁盘”消息。我想知道我是否要对数据进行初始分区,以便所有具有相同 id 的行都驻留在同一个分区中,如果它需要进行左洗牌并正确执行。
要执行我正在使用的初始加载:
sqlContext.read().parquet(inputPathArray).repartition(10000, new Column("id"));
我不确定这是否是分区数据帧的正确方法,所以我的第一个问题是上述正确的。
我的下一个问题是,当我从数据框转到 rdd 时,使用:
JavaRDD<LocationRecord> locationsForSpecificKey = sqlc.sql("SELECT * FROM standardlocationrecords WHERE customerID = " + customerID + " AND partnerAppID = " + partnerAppID)
.toJavaRDD().map(new LocationRecordFromRow()::apply);
是保留数据帧的分区方案,还是在使用 mapToPair 后需要重新分区:
rdd.partitionBy 并传入一个使用 ID 字段哈希的自定义 HashPartitioner。
我的目标是在执行最终 combineByKey 时减少洗牌,以防止作业内存不足和失败。任何帮助将不胜感激。
谢谢, 内森
【问题讨论】:
【参考方案1】:我不确定这是否是分区数据帧的正确方法
看起来差不多。
是保留的数据帧中的分区方案
应保留数据分布,通过查看debugString
可以轻松检查:
val df = sqlContext.read.parquet("/tmp/foo").repartition(10000, $"id")
df.rdd.toDebugString
// String =
// (10000) MapPartitionsRDD[46] at rdd at <console>:26 []
// | ShuffledRowRDD[45] at rdd at <console>:26 []
// +-(8) MapPartitionsRDD[44] at rdd at <console>:26 []
// | $anon$1[43] at []
但没有为输出 RDD 设置分区器:
df.rdd.partitioner
// Option[org.apache.spark.Partitioner] = None
因此此信息不能用于优化后续聚合。
我的目标是减少洗牌
如果是这样,它看起来不是一个正确的方法。假设传递给combineByKey
的mergeValue
函数是一个归约操作,您实际上比直接应用combineByKey
更容易洗牌。如果不是这种情况,那么应用 combineByKey
并将 mapSideCombine
设置为 false 可能是更好的选择。
根据组合逻辑,您还应该考虑直接在DataFrame
上执行聚合。
【讨论】:
如果是这样,请参阅下面的讨论 ***.com/a/37580350/1560062 和 ***.com/q/37189802/1560062 需要注意的是,combineByKey 的输出类型与输入类型不同,所以我不确定是否可以使用 groupByKey。 如果您认为这是必需的,我会尝试DataFrame.repartition
并将combineByKey
与mapSideCombine=false
一起使用。如果单个(键,值)对仍然太大而无法放入内存,那么最终合并仍然会失败。以上是关于优化 Spark combineByKey的主要内容,如果未能解决你的问题,请参考以下文章