大型数据框上的 Pyspark groupBy

Posted

技术标签:

【中文标题】大型数据框上的 Pyspark groupBy【英文标题】:Pyspark groupBy on large dataframe 【发布时间】:2020-12-16 01:41:09 【问题描述】:

我们有一个非常大的 Pyspark Dataframe,我们需要对其执行 groupBy 操作。

我们已经尝试过

df_gp=df.groupBy('some_column').count()

而且它需要很长时间(它已经运行了超过 17 小时没有结果)。

我也试过

df_gp=df.groupBy('some_column').agg(count)

但据我所知,行为是相同的。

更多上下文:

我们正在使用 %spark2.pyspark 解释器在 Zeppelin(版本 0.8.0)上运行此操作 Zeppelin 正在 Yarn 客户端上运行 数据存储在 Hive (Hive 3.1.0.3.1.0.0-78) 初始数据帧是通过使用 llap 查询 Hive 创建的:
from pyspark_llap import HiveWarehouseSession
hive = HiveWarehouseSession.session(spark).build()

req=""" SELECT *
        FROM table
        where isodate='2020-07-27'
    """

df = hive.executeQuery(req)
数据框大小约为 6000 万行,9 列 在同一环境中对同一 Dataframe 执行的其他操作,例如 count()cache() 可在一分钟内完成

我一直在阅读有关 Spark 的 groupBy 在不同来源上的信息,但从我收集到的 here 中,Dataframe API 不需要加载或随机播放内存中的键,因此即使在大型内存中也不应该成为问题数据框。

我知道在如此大量的数据上使用groupBy 可能需要一些时间,但这确实太多了。我猜可能有一些内存参数需要调整,或者我们执行 groupBy 操作的方式可能有问题?

[编辑] 我忘了提到在 groupBy 之前在 Dataframe 上处理了一些 UDF。我试过了:

groupBy 在没有 UDF 的大型 Dataframe 上:不到一分钟就给出结果 groupBy 在已处理数据帧的样本上:与以前相同的问题

所以我们认为 UDF 是问题的真正原因,而不是 groupBy

【问题讨论】:

【参考方案1】:

首先是一些神话破灭者

    .groupBy('some_column').count().groupBy('some_column').count() 相同

    groupBy 导致随机播放,该帖子的意思是它只随机播放必要的列数据(没有在 groupBy 或 agg 函数中未使用的额外列)

    我一直在不同来源阅读有关 Spark 的 groupBy 的信息,但从我在这里收集的信息来看,Dataframe API 不需要在内存中加载或随机播放键,因此即使在大型 Dataframe 上也不应该成为问题。

现在解决您的问题

    groupBy 可能需要一些时间,如果更多数据被洗牌并且spark.sql.shuffle.partitions 设置为低(默认为 200)。在这种情况下,1 个核心将有大量的混洗数据进行聚合 如果groupBy 中使用的列存在数据倾斜,也可能需要很长时间,因为这会导致大量数据进入单个执行程序核心

解决方案

    spark.sql.shuffle.partitions 增加到更高的值(根据我的经验,应该在 <amount_of_data_shuffled_in_gb>/100MB 左右,以确保 1 个核心获得大约 100 MB 的数据来聚合 可以通过在数据中引入随机性来解决偏差(盐渍)https://dzone.com/articles/why-your-spark-apps-are-slow-or-failing-part-ii-da

【讨论】:

谢谢!我一直在尝试同一数据集的小样本(约 6000 行),结果是相同的,但在 groupBy 之前有一些 UDF 可能会导致倾斜【参考方案2】:

它运行缓慢可能是因为底层 Hive 查询而不是因为 groupBy 操作。您可能知道,spark 会进行惰性评估,因此延迟可能来自上述任何一个。 测试它的一种方法是cache() 数据帧或在对其执行 groupBy 之前调用简单的count()。如果您看到相同的问题,那是因为 hive 查询执行,并且解决方案在那里看起来会有所不同。您还可以尝试从文件中读取数据,看看在执行 groupBy 时是否注意到相同的执行时间。

【讨论】:

感谢您的回答。我可能应该在帖子中提到它,但我们确实在执行groupBy() 之前在数据帧上尝试了count(),它在不到一分钟的时间内给出了结果。我会尝试cache() df 看看它是否有任何改变。

以上是关于大型数据框上的 Pyspark groupBy的主要内容,如果未能解决你的问题,请参考以下文章

数据框上的 Pyspark UDF 列

pyspark 数据框上的自定义函数

如何优化大数据框上的 spark sql 操作?

pyspark/EMR 中大型 DataFrame 上的 collect() 或 toPandas()

错误'AttributeError:'DataFrameGroupBy'对象没有属性'而数据框上的groupby功能

数据框上的年度移动窗口