大型数据框上的 Pyspark groupBy
Posted
技术标签:
【中文标题】大型数据框上的 Pyspark groupBy【英文标题】:Pyspark groupBy on large dataframe 【发布时间】:2020-12-16 01:41:09 【问题描述】:我们有一个非常大的 Pyspark Dataframe,我们需要对其执行 groupBy 操作。
我们已经尝试过
df_gp=df.groupBy('some_column').count()
而且它需要很长时间(它已经运行了超过 17 小时没有结果)。
我也试过
df_gp=df.groupBy('some_column').agg(count)
但据我所知,行为是相同的。
更多上下文:
我们正在使用 %spark2.pyspark 解释器在 Zeppelin(版本 0.8.0)上运行此操作 Zeppelin 正在 Yarn 客户端上运行 数据存储在 Hive (Hive 3.1.0.3.1.0.0-78) 初始数据帧是通过使用 llap 查询 Hive 创建的:from pyspark_llap import HiveWarehouseSession
hive = HiveWarehouseSession.session(spark).build()
req=""" SELECT *
FROM table
where isodate='2020-07-27'
"""
df = hive.executeQuery(req)
数据框大小约为 6000 万行,9 列
在同一环境中对同一 Dataframe 执行的其他操作,例如 count()
或 cache()
可在一分钟内完成
我一直在阅读有关 Spark 的 groupBy
在不同来源上的信息,但从我收集到的 here 中,Dataframe API 不需要加载或随机播放内存中的键,因此即使在大型内存中也不应该成为问题数据框。
我知道在如此大量的数据上使用groupBy
可能需要一些时间,但这确实太多了。我猜可能有一些内存参数需要调整,或者我们执行 groupBy 操作的方式可能有问题?
[编辑] 我忘了提到在 groupBy
之前在 Dataframe 上处理了一些 UDF。我试过了:
groupBy
在没有 UDF 的大型 Dataframe 上:不到一分钟就给出结果
groupBy
在已处理数据帧的样本上:与以前相同的问题
所以我们认为 UDF 是问题的真正原因,而不是 groupBy
【问题讨论】:
【参考方案1】:首先是一些神话破灭者
.groupBy('some_column').count()
和 .groupBy('some_column').count()
相同
groupBy
导致随机播放,该帖子的意思是它只随机播放必要的列数据(没有在 groupBy 或 agg 函数中未使用的额外列)
我一直在不同来源阅读有关 Spark 的 groupBy 的信息,但从我在这里收集的信息来看,Dataframe API 不需要在内存中加载或随机播放键,因此即使在大型 Dataframe 上也不应该成为问题。
现在解决您的问题
groupBy
可能需要一些时间,如果更多数据被洗牌并且spark.sql.shuffle.partitions
设置为低(默认为 200)。在这种情况下,1 个核心将有大量的混洗数据进行聚合
如果groupBy
中使用的列存在数据倾斜,也可能需要很长时间,因为这会导致大量数据进入单个执行程序核心
解决方案
-
将
spark.sql.shuffle.partitions
增加到更高的值(根据我的经验,应该在 <amount_of_data_shuffled_in_gb>/100MB
左右,以确保 1 个核心获得大约 100 MB 的数据来聚合
可以通过在数据中引入随机性来解决偏差(盐渍)https://dzone.com/articles/why-your-spark-apps-are-slow-or-failing-part-ii-da
【讨论】:
谢谢!我一直在尝试同一数据集的小样本(约 6000 行),结果是相同的,但在groupBy
之前有一些 UDF 可能会导致倾斜【参考方案2】:
它运行缓慢可能是因为底层 Hive 查询而不是因为 groupBy
操作。您可能知道,spark 会进行惰性评估,因此延迟可能来自上述任何一个。
测试它的一种方法是cache()
数据帧或在对其执行 groupBy 之前调用简单的count()
。如果您看到相同的问题,那是因为 hive 查询执行,并且解决方案在那里看起来会有所不同。您还可以尝试从文件中读取数据,看看在执行 groupBy 时是否注意到相同的执行时间。
【讨论】:
感谢您的回答。我可能应该在帖子中提到它,但我们确实在执行groupBy()
之前在数据帧上尝试了count()
,它在不到一分钟的时间内给出了结果。我会尝试cache()
df 看看它是否有任何改变。以上是关于大型数据框上的 Pyspark groupBy的主要内容,如果未能解决你的问题,请参考以下文章
pyspark/EMR 中大型 DataFrame 上的 collect() 或 toPandas()