如何将列除以 Spark DataFrame 中的总和
Posted
技术标签:
【中文标题】如何将列除以 Spark DataFrame 中的总和【英文标题】:How to divide a column by its sum in a Spark DataFrame 【发布时间】:2018-01-31 22:43:41 【问题描述】:如何在 Spark DataFrame 中高效且不立即触发计算,将一列除以它自己的总和?
假设我们有一些数据:
import pyspark
from pyspark.sql import SparkSession, Window
import pyspark.sql.functions as spf
spark = SparkSession.builder.master('local').getOrCreate()
data = spark.range(0, 100)
data # --> DataFrame[id: bigint]
我想在这个数据框上创建一个名为“规范化”的新列,其中包含id / sum(id)
。一种方法是预先计算总和,如下所示:
s = data.select(spf.sum('id')).collect()[0][0]
data2 = data.withColumn('normalized', spf.col('id') / s)
data2 # --> DataFrame[id: bigint, normalized: double]
这很好,但它会立即触发计算;如果您为许多列定义类似的内容,则会导致数据的多次冗余传递。
另一种方法是使用包含整个表格的窗口规范:
w = Window.rowsBetween(Window.unboundedPreceding, Window.unboundedFollowing)
data3 = data.withColumn('normalized', spf.col('id') / spf.sum('id').over(w))
data3 # --> DataFrame[id: bigint, normalized: double]
在这种情况下,可以定义data3
,但是一旦您尝试实际计算它,Spark 2.2.0 会将所有数据移动到单个分区中,这通常会导致大型数据集的作业失败。
还有哪些其他方法可以解决这个问题,不会触发立即计算并且适用于大型数据集?我对任何解决方案都感兴趣,不一定是基于pyspark
的解决方案。
【问题讨论】:
【参考方案1】:crossJoin
聚合是一种方法:
data.crossJoin(
data.select(spf.sum('id').alias("sum_id"))
).withColumn("normalized", spf.col("id") / spf.col("sum_id"))
但我不会太担心:
这很好,但它会立即触发计算;如果您为许多列定义类似的内容,则会导致数据的多次冗余传递。
一次计算多个统计数据:
data2 = data.select(spf.rand(42).alias("x"), spf.randn(42).alias("y"))
mean_x, mean_y = data2.groupBy().mean().first()
剩下的只是对局部表达式的操作:
data2.select(spf.col("x") - mean_x, spf.col("y") - mean_y)
【讨论】:
以上是关于如何将列除以 Spark DataFrame 中的总和的主要内容,如果未能解决你的问题,请参考以下文章
Pandas: Grouped DataFrame - 将列的值除以每个组该列中某一行的值