如何在不使用 StandardScaler 的情况下标准化 PySpark 中的列?

Posted

技术标签:

【中文标题】如何在不使用 StandardScaler 的情况下标准化 PySpark 中的列?【英文标题】:How to standardize a column in PySpark without using StandardScaler? 【发布时间】:2018-08-09 21:31:10 【问题描述】:

看起来应该可以,但我遇到了错误:

mu =  mean(df[input])
sigma = stddev(df[input])
dft = df.withColumn(output, (df[input]-mu)/sigma)
pyspark.sql.utils.AnalysisException: "grouping expressions sequence is
empty, and '`user`' is not an aggregate function. Wrap
'(((CAST(`sum(response)` AS DOUBLE) - avg(`sum(response)`)) /
stddev_samp(CAST(`sum(response)` AS DOUBLE))) AS `scaled`)' in
windowing function(s) or wrap '`user`' in first() (or first_value) if
you don't care which value you get.;;\nAggregate [user#0,
sum(response)#26L, ((cast(sum(response)#26L as double) -
avg(sum(response)#26L)) / stddev_samp(cast(sum(response)#26L as
double))) AS scaled#46]\n+- AnalysisBarrier\n      +- Aggregate
[user#0], [user#0, sum(cast(response#3 as bigint)) AS
sum(response)#26L]\n         +- Filter item_id#1 IN
(129,130,131,132,133,134,135,136,137,138)\n            +-
Relation[user#0,item_id#1,response_value#2,response#3,trait#4,response_timestamp#5] 
csv\n"

我不确定这条错误消息是怎么回事。

【问题讨论】:

这只适用于向量。 【参考方案1】:

使用collect() 通常不是一个好的解决方案,您会发现这不会随着数据的增长而扩展。

如果您不想使用StandardScaler,更好的方法是使用Window 来计算均值和标准差。

借用StandardScaler in Spark not working as expected的同一个例子:

from pyspark.sql.functions import col, mean, stddev
from pyspark.sql import Window

df = spark.createDataFrame(
    np.array(range(1,10,1)).reshape(3,3).tolist(),
    ["int1", "int2", "int3"]
)
df.show()
#+----+----+----+
#|int1|int2|int3|
#+----+----+----+
#|   1|   2|   3|
#|   4|   5|   6|
#|   7|   8|   9|
#+----+----+----+

假设您想标准化列int2

input_col = "int2"
output_col = "int2_scaled"

w = Window.partitionBy()

mu = mean(input_col).over(w)
sigma = stddev(input_col).over(w)

df.withColumn(output_col, (col(input_col) - mu)/(sigma)).show()
#+----+----+----+-----------+
#|int1|int2|int3|int2_scaled|
#+----+----+----+-----------+
#|   1|   2|   3|       -1.0|
#|   7|   8|   9|        1.0|
#|   4|   5|   6|        0.0|
#+----+----+----+-----------+

如果您想像其他示例一样使用总体标准差,请将pyspark.sql.functions.stddev 替换为pyspark.sql.functions.stddev_pop()

【讨论】:

好答案!谢谢。【参考方案2】:

幸运的是,我能够找到有效的代码:

summary =  df.select([mean(input).alias('mu'), stddev(input).alias('sigma')])\
    .collect().pop()
dft = df.withColumn(output, (df[input]-summary.mu)/summary.sigma)

【讨论】:

@pault 随意提供您认为更好的答案。

以上是关于如何在不使用 StandardScaler 的情况下标准化 PySpark 中的列?的主要内容,如果未能解决你的问题,请参考以下文章

如何通过 StandardScaler 使用 fit 和 transform 训练和测试数据

如何在 PySpark 中使用 StandardScaler 标准化测试数据集?

sklearn.preprocessing.StandardScaler 离线使用 不使用pickle如何做

如何在列子集上实现 PySpark StandardScaler?

如何解决 StandardScaler 值错误

当机器学习模型标准化时如何预测新值 StandardScaler