PySpark:一步计算平均值、标准差和平均值附近的值

Posted

技术标签:

【中文标题】PySpark:一步计算平均值、标准差和平均值附近的值【英文标题】:PySpark: calculate mean, standard deviation and those values around the mean in one step 【发布时间】:2016-07-08 12:45:30 【问题描述】:

我的原始数据采用表格格式。它包含来自不同变量的观察结果。每个观察值都有变量名、时间戳和当时的值。

变量[字符串]、时间[日期时间]、值[浮点数]

数据以 Parquet 形式存储在 HDFS 中并加载到 Spark 数据帧 (df) 中。来自那个数据框。

现在我想为每个变量计算默认统计数据,例如平均值、标准偏差等。之后,一旦检索到平均值,我想过滤/计算那些与平均值非常接近的变量的值。

由于对我的other question 的回答,我想出了这个代码:

from pyspark.sql.window import Window
from pyspark.sql.functions import *
from pyspark.sql.types import *

w1 = Window().partitionBy("Variable")
w2 = Window.partitionBy("Variable").orderBy("Time")

def stddev_pop_w(col, w):
    #Built-in stddev doesn't support windowing
    return sqrt(avg(col * col).over(w) - pow(avg(col).over(w), 2))

def isInRange(value, mean, stddev, radius):
    try:
        if (abs(value - mean) < radius * stddev):
            return 1
        else:
            return 0
    except AttributeError:
        return -1

delta = col("Time").cast("long") - lag("Time", 1).over(w2).cast("long")
#f = udf(lambda (value, mean, stddev, radius): abs(value - mean) < radius * stddev, IntegerType())
#f2 = udf(lambda value, mean, stddev: isInRange(value, mean, stddev, 2), IntegerType())
#f3 = udf(lambda value, mean, stddev: isInRange(value, mean, stddev, 3), IntegerType())

df_ = df_all \
    .withColumn("mean", mean("Value").over(w1)) \
    .withColumn("std_deviation", stddev_pop_w(col("Value"), w1)) \
    .withColumn("delta", delta) \
#    .withColumn("stddev_2", f2("Value", "mean", "std_deviation")) \
#    .withColumn("stddev_3", f3("Value", "mean", "std_deviation")) \

#df2.show(5, False)

问题:最后两条注释行不起作用。它将给出一个 AttributeError,因为 stddev 和 mean 的传入值为 null。我想这是因为我指的是那些也只是动态计算并且当时没有价值的列。 但是有没有办法做到这一点?

目前我正在像这样进行第二次运行:

df = df_.select("*", \
    abs(df_.Value - df_.mean).alias("max_deviation_mean"), \
    when(abs(df_.Value - df_.mean) < 2 * df_.std_deviation, 1).otherwise(1).alias("std_dev_mean_2"), \
    when(abs(df_.Value - df_.mean) < 3 * df_.std_deviation, 1).otherwise(1).alias("std_dev_mean_3"))

【问题讨论】:

【参考方案1】:

解决方案是使用 DataFrame.aggregateByKey 函数聚合每个分区和节点的值,然后在计算节点周围混洗聚合,然后将它们组合成一个结果值。

伪代码看起来像这样。它的灵感来自this tutorial,但它使用了 StatCounter 的两个实例,尽管我们同时汇总了两个不同的统计数据:

from pyspark.statcounter import StatCounter
# value[0] is the timestamp and value[1] is the float-value
# we are using two instances of StatCounter to sum-up two different statistics

def mergeValues(s1, v1, s2, v2):
    s1.merge(v1)
    s2.merge(v2)
    return

def combineStats(s1, s2):
    s1[0].mergeStats(s2[0])
    s1[1].mergeStats(s2[1])
    return
(df.aggregateByKey((StatCounter(), StatCounter()),
        (lambda s, values: mergeValues(s[0], values[0], s[1], values[1]),
        (lambda s1, s2: combineStats(s1, s2))
    .mapValues(lambda s: (  s[0].min(), s[0].max(), s[1].max(), s[1].min(), s[1].mean(), s[1].variance(), s[1].stddev,() s[1].count()))
    .collect())

【讨论】:

【参考方案2】:

这不起作用,因为当您执行时

from pyspark.sql.functions import *

你使用 pyspark.sql.functions.abs 隐藏内置 abs,它需要一个列而不是本地 Python 值作为输入。

您创建的 UDF 也不处理 NULL 条目。

不要使用import *,除非您知道究竟导入了什么。而是别名

from pyspark.sql.functions import abs as abs_

或导入模块

from pyspark.sql import functions as sqlf

sqlf.col("x")

始终检查 UDF 内的输入,除非必要,否则最好避免使用 UDF。

【讨论】:

你是说当我改变导入时它会起作用? 我是说这和缺少 NULL / None 处理是明显的问题。那里可能还有其他问题。

以上是关于PySpark:一步计算平均值、标准差和平均值附近的值的主要内容,如果未能解决你的问题,请参考以下文章

如何根据随机分布数据计算 C++ 中的样本均值、标准差和方差,并与原始均值和 sigma 进行比较

我正在尝试计算 R 中每年的标准差和平均回报

请计算数据组"10,7,10,8,10,10,7,6"的均值,中位数,众数,标准差和极差

方差标准差和协方差三者之间的定义与计算

如何有效地计算pyspark中的平均值和标准差

如何求标准差的置信区间