PySpark:我们应该迭代更新数据帧吗?
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了PySpark:我们应该迭代更新数据帧吗?相关的知识,希望对你有一定的参考价值。
我的问题有两个部分。第一个是理解Spark的工作方式,第二个是优化。
我有一个火花数据框,它有多个分类变量。对于这些分类变量中的每一个,我添加一个新列,其中每一行是相应级别的频率。
例如
Date_Built Square_Footage Num_Beds Num_Baths State Price Freq_State
01/01/1920 1700 3 2 NY 700000 4500
这里为State
(一个分类变量),我添加了一个新的变量Freq_State
。级别NY
在数据集中出现4500
次,因此该行在4500
列中获得Freq_State
。
我有多个这样的列,我添加一个相应级别的频率列。
这是我用来实现这个目的的代码
def calculate_freq(df, categorical_cols):
for each_cat_col in categorical_cols:
_freq = df.select(each_cat_col).groupBy(each_cat_col).count()
df = df.join(_freq, each_cat_col, "inner")
return df
第1部分
在这里,正如您所看到的,我正在更新for
循环中的数据帧。当我在群集上运行此代码时,这种更新数据帧的方式是否可取?如果它是熊猫数据帧,我不会担心这个。但我不确定上下文何时变为火花。
另外,如果我只是在循环中而不是在函数内部运行上述过程,它会有所作为吗?
第2部分
是否有更优化的方法来做到这一点?我每次进入循环时都会加入这里?可以避免这种情况
是否有更优化的方法来做到这一点?
有哪些替代方案?
- 您可以使用Window函数
def calculate_freq(df, categorical_cols): for cat_col in categorical_cols: w = Window.partitionBy(cat_col) df = df.withColumn("{}_freq".format(each_cat_col), count("*").over(w)) return df
你应该?不。与join
不同,它总是需要完全洗牌的非聚合的DataFrame
。 - 您可以使用
melt
并使用单个本地对象(这要求所有分类列具有相同的类型):from itertools import groupby for c in categorical_cols: df = df.withColumn(c, df[c].cast("string")) rows = (melt(df, id_vars=[], value_vars=categorical_cols) .groupBy("variable", "value").count().collect()) mapping = {k: {x.value: x["count"] for x in v} for k, v in groupby(sorted(rows), lambda x: x.variable)}
并使用udf
添加值:from pyspark.sql.functions import udf def get_count(mapping_c): @udf("bigint") def _(x): return mapping_c.get(x) return _ for c in categorical_cols: df = df.withColumn("{}_freq".format(c), get_count(mapping[c])(c))
你应该?也许。与迭代连接不同,它只需要一个动作来计算所有统计信息。如果结果很小(预期有分类变量),您可以获得适度的性能提升。 - 添加
broadcast
提示。from pyspark.sql.functions import broadcast def calculate_freq(df, categorical_cols): for each_cat_col in categorical_cols: _freq = df.select(each_cat_col).groupBy(each_cat_col).count() df = df.join(broadcast(_freq), each_cat_col, "inner") return df
Spark应该自动广播,所以它不应该改变一件事,但帮助规划者总是更好。
另外,如果我只是在循环中而不是在函数内部运行上述过程,它会有所作为吗?
它忽略了代码的可维护性和可测试性。
以上是关于PySpark:我们应该迭代更新数据帧吗?的主要内容,如果未能解决你的问题,请参考以下文章
Pyspark 数据框:访问列(TypeError:列不可迭代)
# 字符串方法 TypeError: 列在 pyspark 中不可迭代
如何在 PySpark 中为一个组迭代 Dataframe / RDD 的每一行。?
PySpark 无法通过 sparkContext/hiveContext 读取 Hive ORC 事务表?我们可以使用 Pyspark 更新/删除配置单元表数据吗?