Spark高效的groupby操作-重新分区?

Posted

技术标签:

【中文标题】Spark高效的groupby操作-重新分区?【英文标题】:Spark efficient groupby operation - repartition? 【发布时间】:2019-05-30 14:41:17 【问题描述】:

我正在使用 pyspark 2.3,我正在尝试找出从数据框中获取一些汇总统计信息的最有效方法。

我有一个包含 15 亿条记录的数据框,分布在一个由 10 个节点组成的相对较小的集群中。每个都有 16gb 的 ram 和 4 个内核。我的复制因子设置为 2。

我的数据框可能有 15 列,它们是数据类型的混合,但我只对两列感兴趣 - ID 和 eventDate。我想运行的代码很简单:

output = df.groupby(['ID']).agg(F.min('eventDate').alias("firstDate"),F.max('eventDate').alias("lastDate"))
output.write.parquet('hdfs:///somewhere/dateFile.parquet',mode='overwrite')

我想弄清楚的是执行此操作的最有效方法。 ID,即我分组依据的字段,有 12m 个值,df.rdd.getNumPartitions() 目前是 642。

我最好先将我的数据框投影到我想要的两列吗?有这么多 ID,我应该先重新分区我的数据集吗?我应该删除重复项吗?我可以在我的 groupby 之前运行这样的东西:

df = df[['ID','eventDate']].drop_duplicates().repartition(x)

df = df[['ID','eventDate']].repartition(x)

我正在努力弄清楚什么会优化运行时。任何有关预先确定运行时的指导将不胜感激。如果可能的话,我不希望只是“测试一下”,因为我有几个这样的查询要运行,每个都需要一段时间。

【问题讨论】:

试穿小一号或解释一下。 DF 是柱状的。 15 亿很小 200 应该是 AGGR 的默认并行度。 【参考方案1】:

这可能不是您正在寻找的答案,但此操作的最佳代码正是

output = df.groupby(['ID']). \
 agg(F.min('eventDate').alias("firstDate"), F.max('eventDate').alias("lastDate"))
output.write.parquet('hdfs:///somewhere/dateFile.parquet', mode='overwrite')

Spark 通过仅首先选择整个操作所需的必要列来优化流程。然后 Spark 按ID 对您的数据进行分区,并在每个分区上开始聚合过程。

允许最大数量的执行者肯定会有所帮助。我建议(根据您的描述)设置spark.executor.instances=10; spark.executor.memory=10g。 12m 个值是一个合理的数量,也许尝试增加 shuffle 分区的数量,f.e. spark.sql.shuffle.partitions=400,这样您就不会遇到一些烦人的内存开销异常。

【讨论】:

【参考方案2】:

@flyingmeatball,

在进行聚合之前,请执行以下步骤

1 - 删除不需要的数据(它会吃掉你的资源)。

2-根据您的数据重新分区和缓存数据(它将消除执行时间)

提示:如果数据来自 Cassandra,则按分区键重新分区数据,以避免数据混洗

现在你可以使用聚合逻辑了;)

谢谢, 维马利什

【讨论】:

你能详细说明一下吗?对于不需要的数据,这是否意味着 drop_duplicates()?删除重复项不需要事先传递数据吗?我的数据来自 CSV 文件 - 我还应该按键分区吗?目前没有。 嗨,在您的源 DF 中,只需删除不需要的列,因为在 DF 操作期间它会占用大量资源。像这样重新分区 df.repartition(column_going_to_aggregate, 1000) 它将减少洗牌过程。谢谢, 谢谢 - 您如何选择 1000 作为重新分区的编号? 那是一个随机数。根据您的需要。请调整此参数。谢谢 这就是我要问的——我该怎么做?

以上是关于Spark高效的groupby操作-重新分区?的主要内容,如果未能解决你的问题,请参考以下文章

Spark中的最佳重新分区方式

Spark DataFrame重新分区:未保留的分区数

为啥在 Spark 中重新分区比 partitionBy 快?

Spark:按键重新分区输出

Spark:持久化和重新分区顺序

Spark 重新分区执行器