Spark dataFrame在更新其列后显示时间过长

Posted

技术标签:

【中文标题】Spark dataFrame在更新其列后显示时间过长【英文标题】:Spark dataFrame taking too long to display after updating its columns 【发布时间】:2020-02-19 10:04:44 【问题描述】:

我有一个大约的数据框。 400 万行和 35 列作为输入。

我对这个 dataFrame 所做的只是以下步骤:

对于给定列的列表,我计算给定组特征列表的总和,并将其作为新列加入到我的输入数据帧中 在将每个新列总和加入数据帧后,我立即删除它。

因此,我们最终得到了与我们开始时相同的数据帧(理论上)。

但是,我注意到,如果给定列的列表太大(超过 6 列),则输出数据帧将变得无法操作。即使是简单的展示也需要 10 分钟。

这是我的代码示例(df 是我的输入数据帧):

  for c in list_columns:
    df = df.join(df.groupby(list_group_features).agg(sum(c).alias('sum_' + c)), list_group_features)
    df = df.drop('sum_' + c)

【问题讨论】:

【参考方案1】:

这是由于 Spark 的内部工作原理及其惰性求值造成的。

当您调用 groupbyjoinagg 时,Spark 会做什么,它将这些调用附加到 df 对象的计划中。因此,即使它没有对数据执行任何操作,您也正在创建一个内部存储在 Spark DataFrame 对象中的大型执行计划。

只有当你调用一个动作(showcountwrite等)时,Spark才会优化计划并执行它。如果计划太大,优化步骤可能需要一段时间才能执行。还要记住,计划优化发生在驱动程序上,而不是执行程序上。因此,如果您的驱动程序很忙或超载,它也会延迟火花计划优化步骤。

请记住,连接是 Spark 中昂贵的操作,无论是优化还是执行。如果可以的话,在单个 DataFrame 上操作时应始终避免连接,而应使用窗口功能。仅当您连接来自不同来源(不同表)的不同数据框时,才应使用连接。

优化代码的一种方法是:

import pyspark
import pyspark.sql.functions as f

w = pyspark.sql.Window.partitionBy(list_group_features)
agg_sum_exprs = [f.sum(f.col(c)).alias("sum_" + c).over(w) for c in list_columns]
res_df = df.select(df.columns + agg_sum_exprs)

对于大型 list_group_featureslist_columns 列表,这应该是可扩展且快速的。

【讨论】:

没有人在我看到的问题上注意到这一点,好东西,对新手有用...

以上是关于Spark dataFrame在更新其列后显示时间过长的主要内容,如果未能解决你的问题,请参考以下文章

SPARK SQL - 使用 DataFrames 和 JDBC 更新 MySql 表

如何在 Spark Dataframe 中显示完整的列内容?

Spark DataFrame to Dict - 字典更新序列元素错误

Spark Dataframe - 窗口函数 - 插入和更新输出的滞后和领先

Spark Scala 聚合组 Dataframe

Spark Dataframe 中的过滤操作