PySpark:我们应该迭代更新数据帧吗?

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了PySpark:我们应该迭代更新数据帧吗?相关的知识,希望对你有一定的参考价值。

我的问题有两个部分。第一个是理解Spark的工作方式,第二个是优化。

我有一个火花数据框,它有多个分类变量。对于这些分类变量中的每一个,我添加一个新列,其中每一行是相应级别的频率。

例如

Date_Built  Square_Footage  Num_Beds    Num_Baths   State   Price     Freq_State
01/01/1920  1700            3           2           NY      700000    4500

这里为State(一个分类变量),我添加了一个新的变量Freq_State。级别NY在数据集中出现4500次,因此该行在4500列中获得Freq_State

我有多个这样的列,我添加一个相应级别的频率列。

这是我用来实现这个目的的代码

def calculate_freq(df, categorical_cols):
    for each_cat_col in categorical_cols:
        _freq = df.select(each_cat_col).groupBy(each_cat_col).count()
        df = df.join(_freq, each_cat_col, "inner")
    return df

第1部分

在这里,正如您所看到的,我正在更新for循环中的数据帧。当我在群集上运行此代码时,这种更新数据帧的方式是否可取?如果它是熊猫数据帧,我不会担心这个。但我不确定上下文何时变为火花。

另外,如果我只是在循环中而不是在函数内部运行上述过程,它会有所作为吗?

第2部分

是否有更优化的方法来做到这一点?我每次进入循环时都会加入这里?可以避免这种情况

答案

是否有更优化的方法来做到这一点?

有哪些替代方案?

  1. 您可以使用Window函数 def calculate_freq(df, categorical_cols): for cat_col in categorical_cols: w = Window.partitionBy(cat_col) df = df.withColumn("{}_freq".format(each_cat_col), count("*").over(w)) return df 你应该?不。与join不同,它总是需要完全洗牌的非聚合的DataFrame
  2. 您可以使用melt并使用单个本地对象(这要求所有分类列具有相同的类型): from itertools import groupby for c in categorical_cols: df = df.withColumn(c, df[c].cast("string")) rows = (melt(df, id_vars=[], value_vars=categorical_cols) .groupBy("variable", "value").count().collect()) mapping = {k: {x.value: x["count"] for x in v} for k, v in groupby(sorted(rows), lambda x: x.variable)} 并使用udf添加值: from pyspark.sql.functions import udf def get_count(mapping_c): @udf("bigint") def _(x): return mapping_c.get(x) return _ for c in categorical_cols: df = df.withColumn("{}_freq".format(c), get_count(mapping[c])(c)) 你应该?也许。与迭代连接不同,它只需要一个动作来计算所有统计信息。如果结果很小(预期有分类变量),您可以获得适度的性能提升。
  3. 添加broadcast提示。 from pyspark.sql.functions import broadcast def calculate_freq(df, categorical_cols): for each_cat_col in categorical_cols: _freq = df.select(each_cat_col).groupBy(each_cat_col).count() df = df.join(broadcast(_freq), each_cat_col, "inner") return df Spark应该自动广播,所以它不应该改变一件事,但帮助规划者总是更好。

另外,如果我只是在循环中而不是在函数内部运行上述过程,它会有所作为吗?

它忽略了代码的可维护性和可测试性。

以上是关于PySpark:我们应该迭代更新数据帧吗?的主要内容,如果未能解决你的问题,请参考以下文章

Pyspark 在查找前一行时按组迭代数据帧

Pyspark 数据框:访问列(TypeError:列不可迭代)

Pandas对Pyspark函数的迭代。

# 字符串方法 TypeError: 列在 pyspark 中不可迭代

如何在 PySpark 中为一个组迭代 Dataframe / RDD 的每一行。?

PySpark 无法通过 sparkContext/hiveContext 读取 Hive ORC 事务表?我们可以使用 Pyspark 更新/删除配置单元表数据吗?