如何在 Spark SQL 中为每个组创建 z 分数

Posted

技术标签:

【中文标题】如何在 Spark SQL 中为每个组创建 z 分数【英文标题】:How to create a z-score in Spark SQL for each group 【发布时间】:2016-04-23 07:23:49 【问题描述】:

我有一个看起来像这样的数据框

        dSc     TranAmount
 1: 100021      79.64
 2: 100021      79.64
 3: 100021       0.16
 4: 100022      11.65
 5: 100022       0.36
 6: 100022       0.47
 7: 100025       0.17
 8: 100037       0.27
 9: 100056       0.27
10: 100063       0.13
11: 100079       0.13
12: 100091       0.15
13: 100101       0.22
14: 100108       0.14
15: 100109       0.04

现在我想创建第三列,其中包含每个 TranAmount 的 z 分数,这将是

(TranAmount-mean(TranAmount))/StdDev(TranAmount)

这里的平均值和标准差将基于每个 dSc 的组

现在我可以在 Spark SQL 中计算平均值和标准差。

(datafromdb
  .groupBy("dSc")
  .agg(datafromdb.dSc, func.avg("TranAmount") ,func.stddev_pop("TranAmount")))

但我不知道如何在数据框中使用 z 分数实现第三列。 我将不胜感激任何指向实现此目标的正确方法的指针/

【问题讨论】:

【参考方案1】:

例如,您可以使用原始数据计算统计数据和join

stats = (df.groupBy("dsc")
  .agg(
      func.stddev_pop("TranAmount").alias("sd"), 
      func.avg("TranAmount").alias("avg")))

df.join(broadcast(stats), ["dsc"])

(df
    .join(func.broadcast(stats), ["dsc"])
    .select("dsc", "TranAmount", (df.TranAmount - stats.avg) / stats.sd))

或者使用窗口函数with standard deviation formula:

from pyspark.sql.window import Window
import sys

def z_score_w(col, w):
    avg_ = func.avg(col).over(w)
    avg_sq = func.avg(col * col).over(w)
    sd_ = func.sqrt(avg_sq - avg_ * avg_)
    return (col - avg_) / sd_

w = Window().partitionBy("dsc").rowsBetween(-sys.maxsize, sys.maxsize)
df.withColumn("zscore", z_score_w(df.TranAmount, w))

【讨论】:

我不太明白rowsBetween(-sys.maxsize, sys.maxsize) 部分 相当于ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING子句。

以上是关于如何在 Spark SQL 中为每个组创建 z 分数的主要内容,如果未能解决你的问题,请参考以下文章

在 Spark 数据框的列中为每个组添加递增的数字

在 spark scala 中为数据帧中的每个组采样不同数量的随机行

如何在 MySQL 中为每个类别创建一个 SQL 窗口函数列?

如何在 Spark 中创建一组 ngram?

spark中将每个组作为新数据帧并在循环中传递另一个函数的最佳方法是啥?

如何在Linux bash中为每个用户在一行中显示组下的用户