标准缩放它在 pyspark 数据帧中花费了太多时间
Posted
技术标签:
【中文标题】标准缩放它在 pyspark 数据帧中花费了太多时间【英文标题】:Standard Scaling it taking too much time in pyspark dataframe 【发布时间】:2021-08-17 05:58:11 【问题描述】:我已尝试使用 spark.ml 中的标准缩放器,其功能如下:
def standard_scale_2(df, columns_to_scale):
"""
Args:
df : spark dataframe
columns_to_scale : list of columns to standard scale
"""
from pyspark.ml.feature import StandardScaler
from pyspark.ml import Pipeline
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.functions import vector_to_array
# UDF for converting column type from vector to double type
unlist = udf(lambda x: round(float(list(x)[0]),3), DoubleType())
# Iterating over columns to be scaled
for i in columns_to_scale:
# VectorAssembler Transformation - Converting column to vector type
assembler = VectorAssembler(inputCols=[i],outputCol=i+"_Vect")
# MinMaxScaler Transformation
scaler = StandardScaler(inputCol=i+"_Vect", outputCol=i+"_Scaled")
# Pipeline of VectorAssembler and MinMaxScaler
pipeline = Pipeline(stages=[assembler, scaler])
# Fitting pipeline on dataframe
df = pipeline.fit(df).transform(df).withColumn(i+"_Scaled", unlist(i+"_Scaled")).drop(i+"_Vect",i).withColumnRenamed(i+"_scaled",i)
return df
除了对每一列进行迭代之外,我还尝试一次缩放所有列,但也没有用。
我也尝试过使用这个简单的 udf 进行标准缩放:
for column in columns_to_standard_scale:
sdf = sdf.withColumn(column,
F.col(column) / sdf.agg(stddev_samp(column)).first()[0])
print(column, " completed")
我在 databricks 中使用带有 c5d.2xlarge(16 gb 内存 8 核) 个节点(最多 30 个节点)的 spark 集群。 spark 数据帧的大小只有 100k。 我需要扩展大约 90 列。 但是每列大约需要 10 分钟来扩展,当我尝试一次扩展所有列时,脚本即使在 2 小时后也没有完成。 但是使用 sklearn 标准缩放器,熊猫中的相同数据帧几乎不需要 2 分钟。
我认为代码或数据框没有任何问题,但我遗漏了一些造成瓶颈的东西,而且这个简单的操作花费了太多时间。
【问题讨论】:
【参考方案1】:我在尝试构建列缩放管道时遇到了类似的问题。在我的数据集中,有 400 个特征,首先我想将它们作为一个单独的管道步骤添加:
stages = []
for i, col_to_scale in enumerate(scallarInputs):
col_scaler = StandardScaler(inputCol=col_to_scale,
outputCol=col_to_scale+"_scaled",withStd=True, withMean=withMean)
stages += [col_scaler]
pipeline = Pipeline(stages = stages)
pipelineModel = pipeline.fit(df)
对于我的数据集,运行需要 六个 小时!
然后我决定先做一个矢量组装然后缩放它:
stages = []
assemblerInputs = df.columns
assemblerInputs = [column for column in assemblerInputs if column not in columns_to_remove_from_assembler]
#add vector assembler
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features_nonscaled")
stages += [assembler]
col_scaler = StandardScaler(inputCol='features_nonscaled', outputCol='features',withStd=True, withMean=False)
stages += [col_scaler]
pipeline = Pipeline(stages = stages)
assemblerModel = pipeline.fit(df)
一切都花了 17 秒!
希望对大家有帮助
【讨论】:
【参考方案2】:标准缩放火花代码没有任何问题。这是 spark 的 lazy evaluation
,我之前没有意识到,我认为这个标准缩放函数有问题。
其实lazy evaluation
的意思是spark会等到最后一刻才执行计算指令图。
在执行这个标准缩放函数之前,我执行了一个回填函数。回填功能实际上是瓶颈,因为当我评论那部分时,我的 spark 应用程序运行良好。
此外,回填功能具有交叉连接、groubBy 等wide transformations
,这非常低效,因为它会导致大量洗牌操作。因此,我修改了该函数,因此,我的整个 spark 应用程序在 30 秒内完成。
【讨论】:
以上是关于标准缩放它在 pyspark 数据帧中花费了太多时间的主要内容,如果未能解决你的问题,请参考以下文章
Pyspark:内部连接两个 pyspark 数据帧并从第一个数据帧中选择所有列,从第二个数据帧中选择几列