Spark高效的groupby操作-重新分区?
Posted
技术标签:
【中文标题】Spark高效的groupby操作-重新分区?【英文标题】:Spark efficient groupby operation - repartition? 【发布时间】:2019-05-30 14:41:17 【问题描述】:我正在使用 pyspark 2.3,我正在尝试找出从数据框中获取一些汇总统计信息的最有效方法。
我有一个包含 15 亿条记录的数据框,分布在一个由 10 个节点组成的相对较小的集群中。每个都有 16gb 的 ram 和 4 个内核。我的复制因子设置为 2。
我的数据框可能有 15 列,它们是数据类型的混合,但我只对两列感兴趣 - ID 和 eventDate。我想运行的代码很简单:
output = df.groupby(['ID']).agg(F.min('eventDate').alias("firstDate"),F.max('eventDate').alias("lastDate"))
output.write.parquet('hdfs:///somewhere/dateFile.parquet',mode='overwrite')
我想弄清楚的是执行此操作的最有效方法。 ID,即我分组依据的字段,有 12m 个值,df.rdd.getNumPartitions() 目前是 642。
我最好先将我的数据框投影到我想要的两列吗?有这么多 ID,我应该先重新分区我的数据集吗?我应该删除重复项吗?我可以在我的 groupby 之前运行这样的东西:
df = df[['ID','eventDate']].drop_duplicates().repartition(x)
或
df = df[['ID','eventDate']].repartition(x)
我正在努力弄清楚什么会优化运行时。任何有关预先确定运行时的指导将不胜感激。如果可能的话,我不希望只是“测试一下”,因为我有几个这样的查询要运行,每个都需要一段时间。
【问题讨论】:
试穿小一号或解释一下。 DF 是柱状的。 15 亿很小 200 应该是 AGGR 的默认并行度。 【参考方案1】:这可能不是您正在寻找的答案,但此操作的最佳代码正是
output = df.groupby(['ID']). \
agg(F.min('eventDate').alias("firstDate"), F.max('eventDate').alias("lastDate"))
output.write.parquet('hdfs:///somewhere/dateFile.parquet', mode='overwrite')
Spark 通过仅首先选择整个操作所需的必要列来优化流程。然后 Spark 按ID
对您的数据进行分区,并在每个分区上开始聚合过程。
允许最大数量的执行者肯定会有所帮助。我建议(根据您的描述)设置spark.executor.instances=10; spark.executor.memory=10g
。 12m 个值是一个合理的数量,也许尝试增加 shuffle 分区的数量,f.e. spark.sql.shuffle.partitions=400
,这样您就不会遇到一些烦人的内存开销异常。
【讨论】:
【参考方案2】:@flyingmeatball,
在进行聚合之前,请执行以下步骤
1 - 删除不需要的数据(它会吃掉你的资源)。
2-根据您的数据重新分区和缓存数据(它将消除执行时间)
提示:如果数据来自 Cassandra,则按分区键重新分区数据,以避免数据混洗
现在你可以使用聚合逻辑了;)
谢谢, 维马利什
【讨论】:
你能详细说明一下吗?对于不需要的数据,这是否意味着 drop_duplicates()?删除重复项不需要事先传递数据吗?我的数据来自 CSV 文件 - 我还应该按键分区吗?目前没有。 嗨,在您的源 DF 中,只需删除不需要的列,因为在 DF 操作期间它会占用大量资源。像这样重新分区 df.repartition(column_going_to_aggregate, 1000) 它将减少洗牌过程。谢谢, 谢谢 - 您如何选择 1000 作为重新分区的编号? 那是一个随机数。根据您的需要。请调整此参数。谢谢 这就是我要问的——我该怎么做?以上是关于Spark高效的groupby操作-重新分区?的主要内容,如果未能解决你的问题,请参考以下文章