如何在列子集上实现 PySpark StandardScaler?

Posted

技术标签:

【中文标题】如何在列子集上实现 PySpark StandardScaler?【英文标题】:How to implement PySpark StandardScaler on subset of columns? 【发布时间】:2021-01-20 23:17:33 【问题描述】:

我想在我的数据框中的 10 列中的 6 列上使用 pyspark StandardScaler。这将是管道的一部分。

inputCol 参数似乎需要一个向量,我可以在对所有特征使用 VectorAssembler 后传入它,但这会缩放所有 10 个特征。我不想缩放其他 4 个特征,因为它们是二元的,我想要它们的非标准化系数。

我是否应该在 6 个特征上使用矢量汇编器,对它们进行缩放,然后在这个缩放的特征向量和剩余的 4 个特征上再次使用矢量汇编器?我最终会得到一个向量中的一个向量,但我不确定这是否可行。

这样做的正确方法是什么?一个例子值得赞赏。

【问题讨论】:

您好,我也遇到了这个问题。你是怎么解决的? 【参考方案1】:

您可以使用 VectorAssembler 来做到这一点。他们的关键是您必须从汇编器输出中提取列。请参阅下面的代码以获取工作示例,

from pyspark.ml.feature import MinMaxScaler, StandardScaler
from pyspark.ml.feature import VectorAssembler
import pandas as pd
import numpy as np
import random

df = pd.DataFrame()
df['a'] = random.sample(range(100), 10)
df['b'] = random.sample(range(100), 10)
df['c'] = random.sample(range(100), 10)
df['d'] = random.sample(range(100), 10)
df['e'] = random.sample(range(100), 10)

sdf = sc.createDataFrame(df)

sdf.show()

+---+---+---+---+---+
|  a|  b|  c|  d|  e|
+---+---+---+---+---+
| 51| 13|  6|  5| 26|
| 18| 29| 19| 81| 28|
| 34|  1| 36| 57| 87|
| 56| 86| 51| 52| 48|
| 36| 49| 33| 15| 54|
| 87| 53| 47| 89| 85|
|  7| 14| 55| 13| 98|
| 70| 50| 32| 39| 58|
| 80| 20| 25| 54| 37|
| 40| 33| 44| 83| 27|
+---+---+---+---+---+

cols_to_scale = ['c', 'd', 'e']
cols_to_keep_unscaled = ['a', 'b']

scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures")
assembler = VectorAssembler().setInputCols(cols_to_scale).setOutputCol("features")
sdf_transformed = assembler.transform(sdf)
scaler_model = scaler.fit(sdf_transformed.select("features"))
sdf_scaled = scaler_model.transform(sdf_transformed)

sdf_scaled.show()

+---+---+---+---+---+----------------+--------------------+
|  a|  b|  c|  d|  e|        features|      scaledFeatures|
+---+---+---+---+---+----------------+--------------------+
| 51| 13|  6|  5| 26|  [6.0,5.0,26.0]|[0.39358015146628...|
| 18| 29| 19| 81| 28|[19.0,81.0,28.0]|[1.24633714630991...|
| 34|  1| 36| 57| 87|[36.0,57.0,87.0]|[2.36148090879773...|
| 56| 86| 51| 52| 48|[51.0,52.0,48.0]|[3.34543128746345...|
| 36| 49| 33| 15| 54|[33.0,15.0,54.0]|[2.16469083306459...|
| 87| 53| 47| 89| 85|[47.0,89.0,85.0]|[3.08304451981926...|
|  7| 14| 55| 13| 98|[55.0,13.0,98.0]|[3.60781805510765...|
| 70| 50| 32| 39| 58|[32.0,39.0,58.0]|[2.09909414115354...|
| 80| 20| 25| 54| 37|[25.0,54.0,37.0]|[1.63991729777620...|
| 40| 33| 44| 83| 27|[44.0,83.0,27.0]|[2.88625444408612...|
+---+---+---+---+---+----------------+--------------------+

# Function just to convert to help build data frame
def extract(row):
  return (row.a, row.b,) + tuple(row.scaledFeatures.toArray().tolist())

sdf_scaled = sdf_scaled.select(*cols_to_keep_unscaled, "scaledFeatures").rdd \
        .map(extract).toDF(cols_to_keep_unscaled + cols_to_scale)
  
  
sdf_scaled.show()


+---+---+------------------+-------------------+------------------+
|  a|  b|                 c|                  d|                 e|
+---+---+------------------+-------------------+------------------+
| 51| 13|0.3935801514662892|0.16399957083190683|0.9667572801316145|
| 18| 29| 1.246337146309916|  2.656793047476891|1.0411232247571234|
| 34|  1|2.3614809087977355| 1.8695951074837378|3.2349185912096337|
| 56| 86|3.3454312874634584| 1.7055955366518312|1.7847826710122114|
| 36| 49| 2.164690833064591|0.49199871249572047| 2.007880504888738|
| 87| 53| 3.083044519819266| 2.9191923608079415|3.1605526465841245|
|  7| 14|3.6078180551076513| 0.4263988841629578| 3.643931286649932|
| 70| 50|2.0990941411535426| 1.2791966524888734|2.1566123941397555|
| 80| 20| 1.639917297776205| 1.7711953649845937| 1.375769975571913|
| 40| 33|2.8862544440861213| 2.7223928758096534| 1.003940252444369|
+---+---+------------------+-------------------+------------------+

【讨论】:

不错的解决方案。有没有办法在管道中做到这一点? 也许你可以试试自定义转换器。

以上是关于如何在列子集上实现 PySpark StandardScaler?的主要内容,如果未能解决你的问题,请参考以下文章

PySpark:计算列子集的最大行数并添加到现有数据帧

如何在本地 falcor 模型数据集上实现突变响应

教程 | 使用MNIST数据集,在TensorFlow上实现基础LSTM网络

获取 AttributeError:“OneHotEncoder”对象没有属性“pyspark 中的 _jdf”

列摘要(在Spark数据集上实现多维数据集功能)

在 pyspark 中实现类不平衡算法