窗口上的 Pyspark StandardScaler

Posted

技术标签:

【中文标题】窗口上的 Pyspark StandardScaler【英文标题】:Pyspark StandardScaler over a Window 【发布时间】:2020-09-21 16:51:04 【问题描述】:

我想在我的数据窗口上使用标准缩放器pyspark.ml.feature.StandardScaler

df4=spark.createDataFrame(
    [
        (1,1, 'X', 'a'),
        (2,1, 'X', 'a'),
        (3,9, 'X', 'b'),
        (5,1, 'X', 'b'),
        (6,2, 'X', 'c'),
        (7,2, 'X', 'c'),
        (8,10, 'Y', 'a'),
        (9,45, 'Y', 'a'),
        (10,3, 'Y', 'a'),
        (11,3, 'Y', 'b'),
        (12,6, 'Y', 'b'),
        (13,19,'Y', 'b')
    ],
    ['id','feature', 'txt', 'cat'] 
)

w = Window().partitionBy(..)

我可以通过调用 .fit& .transform 方法对整个数据框执行此操作。但不是在我们通常使用的w 变量上,例如F.col('feature') - F.mean('feature').over(w)

我可以将所有窗口化/分组数据转换为单独的列,将其放入数据框中,然后对其应用 StandardScaler 并转换回1D。还有其他方法吗?最终目标是尝试不同的缩放器,包括pyspark.ml.feature.RobustScaler

【问题讨论】:

【参考方案1】:

我最终不得不编写自己的缩放器类。在上述问题中使用pyspark StandardScaler 是不合适的,因为我们都知道端到端系列转换更有效。尽管如此,我还是想出了自己的缩放器。它并没有真正使用来自 pyspark 的 Window,但我使用 groupby 实现了该功能。

class StandardScaler:
    
    tol = 0.000001
    
    def __init__(self, colsTotransform, groupbyCol='txt', orderBycol='id'):
        self.colsTotransform = colsTotransform
        self.groupbyCol=groupbyCol
        self.orderBycol=orderBycol
    
    def __tempNames__(self):
        return [(f"colname_transformed",colname) for colname in self.colsTotransform]
    
    def fit(self, df):
        funcs = [(F.mean(name), F.stddev(name)) for name in self.colsTotransform]
        exprs = [ff for tup in funcs for ff in tup]
        self.stats = df.groupBy([self.groupbyCol]).agg(*exprs)
    
    def __transformOne__(self, df_with_stats, newName, colName):
        return df_with_stats\
                .withColumn(newName, 
                            (F.col(colName)-F.col(f'avg(colName)'))/(F.col(f'stddev_samp(colName)')+self.tol))\
                .drop(colName)\
                .withColumnRenamed(newName, colName)

    def transform(self, df):
        df_with_stats = df.join(self.stats, on=self.groupbyCol, how='inner').orderBy(self.orderBycol)
        return reduce(lambda df_with_stats, kv: self.__transformOne__(df_with_stats, *kv), 
                       self.__tempNames__(), df_with_stats)[df.columns]
    
   

用法:

ss = StandardScaler(colsTotransform=['feature'],groupbyCol='txt',orderbyCol='id')
ss.fit(df4)
ss.stats.show()

+---+------------------+--------------------+
|txt|      avg(feature)|stddev_samp(feature)|
+---+------------------+--------------------+
|  Y|14.333333333333334|  16.169930941926335|
|  X|2.6666666666666665|  3.1411250638372654|
+---+------------------+--------------------+

df4.show()

+---+-------+---+---+
| id|feature|txt|cat|
+---+-------+---+---+
|  1|      1|  X|  a|
|  2|      1|  X|  a|
|  3|      9|  X|  b|
|  5|      1|  X|  b|
|  6|      2|  X|  c|
|  7|      2|  X|  c|
|  8|     10|  Y|  a|
|  9|     45|  Y|  a|
| 10|      3|  Y|  a|
| 11|      3|  Y|  b|
| 12|      6|  Y|  b|
| 13|     19|  Y|  b|
+---+-------+---+---+

ss.transform(df4).show()
+---+--------------------+---+---+
| id|             feature|txt|cat|
+---+--------------------+---+---+
|  1|  -0.530595281053646|  X|  a|
|  2|  -0.530595281053646|  X|  a|
|  3|  2.0162620680038548|  X|  b|
|  5|  -0.530595281053646|  X|  b|
|  6|-0.21223811242145835|  X|  c|
|  7|-0.21223811242145835|  X|  c|
|  8| -0.2679871102053074|  Y|  a|
|  9|  1.8965241645298676|  Y|  a|
| 10| -0.7008893651523425|  Y|  a|
| 11| -0.7008893651523425|  Y|  b|
| 12| -0.5153598273178989|  Y|  b|
| 13|  0.2886015032980233|  Y|  b|
+---+--------------------+---+---+

【讨论】:

以上是关于窗口上的 Pyspark StandardScaler的主要内容,如果未能解决你的问题,请参考以下文章

PySpark - 窗口函数导致新列

在 PySpark 中随时间窗口聚合

带有窗口函数的 PySpark 数据偏度

Pyspark:在窗口内使用 udf

PySpark 结构化流将 udf 应用于窗口

PySpark 数据框条件按窗口/滞后