窗口上的 Pyspark StandardScaler
Posted
技术标签:
【中文标题】窗口上的 Pyspark StandardScaler【英文标题】:Pyspark StandardScaler over a Window 【发布时间】:2020-09-21 16:51:04 【问题描述】:我想在我的数据窗口上使用标准缩放器pyspark.ml.feature.StandardScaler
。
df4=spark.createDataFrame(
[
(1,1, 'X', 'a'),
(2,1, 'X', 'a'),
(3,9, 'X', 'b'),
(5,1, 'X', 'b'),
(6,2, 'X', 'c'),
(7,2, 'X', 'c'),
(8,10, 'Y', 'a'),
(9,45, 'Y', 'a'),
(10,3, 'Y', 'a'),
(11,3, 'Y', 'b'),
(12,6, 'Y', 'b'),
(13,19,'Y', 'b')
],
['id','feature', 'txt', 'cat']
)
w = Window().partitionBy(..)
我可以通过调用 .fit
& .transform
方法对整个数据框执行此操作。但不是在我们通常使用的w
变量上,例如F.col('feature') - F.mean('feature').over(w)
。
我可以将所有窗口化/分组数据转换为单独的列,将其放入数据框中,然后对其应用 StandardScaler 并转换回1D
。还有其他方法吗?最终目标是尝试不同的缩放器,包括pyspark.ml.feature.RobustScaler
。
【问题讨论】:
【参考方案1】:我最终不得不编写自己的缩放器类。在上述问题中使用pyspark StandardScaler
是不合适的,因为我们都知道端到端系列转换更有效。尽管如此,我还是想出了自己的缩放器。它并没有真正使用来自 pyspark 的 Window
,但我使用 groupby
实现了该功能。
class StandardScaler:
tol = 0.000001
def __init__(self, colsTotransform, groupbyCol='txt', orderBycol='id'):
self.colsTotransform = colsTotransform
self.groupbyCol=groupbyCol
self.orderBycol=orderBycol
def __tempNames__(self):
return [(f"colname_transformed",colname) for colname in self.colsTotransform]
def fit(self, df):
funcs = [(F.mean(name), F.stddev(name)) for name in self.colsTotransform]
exprs = [ff for tup in funcs for ff in tup]
self.stats = df.groupBy([self.groupbyCol]).agg(*exprs)
def __transformOne__(self, df_with_stats, newName, colName):
return df_with_stats\
.withColumn(newName,
(F.col(colName)-F.col(f'avg(colName)'))/(F.col(f'stddev_samp(colName)')+self.tol))\
.drop(colName)\
.withColumnRenamed(newName, colName)
def transform(self, df):
df_with_stats = df.join(self.stats, on=self.groupbyCol, how='inner').orderBy(self.orderBycol)
return reduce(lambda df_with_stats, kv: self.__transformOne__(df_with_stats, *kv),
self.__tempNames__(), df_with_stats)[df.columns]
用法:
ss = StandardScaler(colsTotransform=['feature'],groupbyCol='txt',orderbyCol='id')
ss.fit(df4)
ss.stats.show()
+---+------------------+--------------------+
|txt| avg(feature)|stddev_samp(feature)|
+---+------------------+--------------------+
| Y|14.333333333333334| 16.169930941926335|
| X|2.6666666666666665| 3.1411250638372654|
+---+------------------+--------------------+
df4.show()
+---+-------+---+---+
| id|feature|txt|cat|
+---+-------+---+---+
| 1| 1| X| a|
| 2| 1| X| a|
| 3| 9| X| b|
| 5| 1| X| b|
| 6| 2| X| c|
| 7| 2| X| c|
| 8| 10| Y| a|
| 9| 45| Y| a|
| 10| 3| Y| a|
| 11| 3| Y| b|
| 12| 6| Y| b|
| 13| 19| Y| b|
+---+-------+---+---+
ss.transform(df4).show()
+---+--------------------+---+---+
| id| feature|txt|cat|
+---+--------------------+---+---+
| 1| -0.530595281053646| X| a|
| 2| -0.530595281053646| X| a|
| 3| 2.0162620680038548| X| b|
| 5| -0.530595281053646| X| b|
| 6|-0.21223811242145835| X| c|
| 7|-0.21223811242145835| X| c|
| 8| -0.2679871102053074| Y| a|
| 9| 1.8965241645298676| Y| a|
| 10| -0.7008893651523425| Y| a|
| 11| -0.7008893651523425| Y| b|
| 12| -0.5153598273178989| Y| b|
| 13| 0.2886015032980233| Y| b|
+---+--------------------+---+---+
【讨论】:
以上是关于窗口上的 Pyspark StandardScaler的主要内容,如果未能解决你的问题,请参考以下文章