标准缩放它在 pyspark 数据帧中花费了太多时间

Posted

技术标签:

【中文标题】标准缩放它在 pyspark 数据帧中花费了太多时间【英文标题】:Standard Scaling it taking too much time in pyspark dataframe 【发布时间】:2021-08-17 05:58:11 【问题描述】:

我已尝试使用 spark.ml 中的标准缩放器,其功能如下:

def standard_scale_2(df, columns_to_scale):
    """
    Args:
    df : spark dataframe
    columns_to_scale : list of columns to standard scale
    """
    from pyspark.ml.feature import StandardScaler
    from pyspark.ml import Pipeline
    from pyspark.ml.feature import VectorAssembler
    from pyspark.ml.functions import vector_to_array
    
    # UDF for converting column type from vector to double type
    unlist = udf(lambda x: round(float(list(x)[0]),3), DoubleType())
    
    # Iterating over columns to be scaled
    for i in columns_to_scale:

      # VectorAssembler Transformation - Converting column to vector type
      assembler = VectorAssembler(inputCols=[i],outputCol=i+"_Vect")

      # MinMaxScaler Transformation
      scaler = StandardScaler(inputCol=i+"_Vect", outputCol=i+"_Scaled")

      # Pipeline of VectorAssembler and MinMaxScaler
      pipeline = Pipeline(stages=[assembler, scaler])

      # Fitting pipeline on dataframe
      df = pipeline.fit(df).transform(df).withColumn(i+"_Scaled", unlist(i+"_Scaled")).drop(i+"_Vect",i).withColumnRenamed(i+"_scaled",i)
    return df

除了对每一列进行迭代之外,我还尝试一次缩放所有列,但也没有用。

我也尝试过使用这个简单的 udf 进行标准缩放:

for column in columns_to_standard_scale:
         sdf = sdf.withColumn(column,
                       F.col(column) / sdf.agg(stddev_samp(column)).first()[0])
         print(column, " completed")

我在 databricks 中使用带有 c5d.2xlarge(16 gb 内存 8 核) 个节点(最多 30 个节点)的 spark 集群。 spark 数据帧的大小只有 100k。 我需要扩展大约 90 列。 但是每列大约需要 10 分钟来扩展,当我尝试一次扩展所有列时,脚本即使在 2 小时后也没有完成。 但是使用 sklearn 标准缩放器,熊猫中的相同数据帧几乎不需要 2 分钟。

我认为代码或数据框没有任何问题,但我遗漏了一些造成瓶颈的东西,而且这个简单的操作花费了太多时间。

【问题讨论】:

【参考方案1】:

我在尝试构建列缩放管道时遇到了类似的问题。在我的数据集中,有 400 个特征,首先我想将它们作为一个单独的管道步骤添加:

stages = []    
for i,  col_to_scale in enumerate(scallarInputs):
            col_scaler = StandardScaler(inputCol=col_to_scale, 
            outputCol=col_to_scale+"_scaled",withStd=True, withMean=withMean)
            stages += [col_scaler]

    pipeline = Pipeline(stages = stages)
    pipelineModel = pipeline.fit(df)
  

对于我的数据集,运行需要 六个 小时!

然后我决定先做一个矢量组装然后缩放它:

stages = []
assemblerInputs = df.columns
assemblerInputs = [column for column in assemblerInputs if column not in columns_to_remove_from_assembler]
#add vector assembler
assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features_nonscaled")
stages += [assembler]

col_scaler = StandardScaler(inputCol='features_nonscaled', outputCol='features',withStd=True, withMean=False)
stages += [col_scaler]

pipeline = Pipeline(stages = stages)
assemblerModel = pipeline.fit(df)

一切都花了 17 秒!

希望对大家有帮助

【讨论】:

【参考方案2】:

标准缩放火花代码没有任何问题。这是 spark 的 lazy evaluation,我之前没有意识到,我认为这个标准缩放函数有问题。 其实lazy evaluation的意思是spark会等到最后一刻才执行计算指令图。 在执行这个标准缩放函数之前,我执行了一个回填函数。回填功能实际上是瓶颈,因为当我评论那部分时,我的 spark 应用程序运行良好。 此外,回填功能具有交叉连接、groubBy 等wide transformations,这非常低效,因为它会导致大量洗牌操作。因此,我修改了该函数,因此,我的整个 spark 应用程序在 30 秒内完成。

【讨论】:

以上是关于标准缩放它在 pyspark 数据帧中花费了太多时间的主要内容,如果未能解决你的问题,请参考以下文章

Pyspark:内部连接两个 pyspark 数据帧并从第一个数据帧中选择所有列,从第二个数据帧中选择几列

当 ID 匹配时,在其他 Pyspark 数据帧中按列划分 Pyspark 数据帧列

pyspark 数据帧中的完全外连接

删除 pyspark 数据帧中的空格

使用pyspark计算每行数据帧中的总值

pyspark,比较数据帧中的两行