在 Spark Scala 中使用“withColumn”函数的替代方法

Posted

技术标签:

【中文标题】在 Spark Scala 中使用“withColumn”函数的替代方法【英文标题】:Alternative to the use of the 'withColumn' function in Spark Scala 【发布时间】:2019-05-27 17:02:06 【问题描述】:

我正在审查开发代码,我需要避免或使用不同的方式在数据框中使用“withColumn”函数来添加列;但我有以下疑问:

    使用嵌套的'withColumn',创建新表(如下面的代码)?使用 6 'withColumn',在内存表中创建 6 个新的?
newDataframe = table
.withColumn("name", col("consolidate").cast(DecimalType(17,2)))
.withColumn("name", col("consolidate").cast(DecimalType(17,2)))

    如果使用许多“withColumn”会增加内存使用量并降低性能(如果为真),如何在数据框中添加列时避免使用“withColumn”并获得相同的结果?

    有没有一种方法可以在不使用'withColumn'的情况下消耗更少的内存并且运行速度更快,但得到相同的结果?,即添加了6列的数据帧

我不知道该怎么做。

要优化的代码是这样的:

def myMethod(table: DataFrame): DataFrame = 
    newDataframe = table
      .withColumn("name", col("consolidate").cast(DecimalType(17,2)))
      .withColumn("id_value", col("east").cast(DecimalType(17,2)))
      .withColumn("x_value", col("daily").cast(DecimalType(17,2)))
      .withColumn("amount", col("paid").cast(DecimalType(17,2)))
      .withColumn("client", col("lima").cast(DecimalType(17,2)))
      .withColumn("capital", col("econo").cast(DecimalType(17,2)))
    newDataframe
  

【问题讨论】:

Spark/Scala repeated calls to withColumn() using the same function on multiple columns的可能重复 提供的答案是正确的,但这里有一些你可以用 withColumn 暗示 foldLeft 的东西,这很可能是你的意思:medium.com/@manuzhang/… 【参考方案1】:

这里有一个误解:Spark 不会在内存中创建 6 个中间数据集。实际上,您的函数不会触发内存中的任何更改,因为 Spark 转换(例如 withColumn)仅在调用操作(例如 .count().show())时才被延迟评估。

当调用动作时,Spark 会优化你的转换并一次性完成,所以调用 6 次 .withColumn 在内存方面没有问题。

【讨论】:

这是真的。还想指出,每个.withColumn 调用都会在驱动程序的内存中创建一个新的数据帧对象,因为DataFrame 是不可变的。因此调用一千个.withColumn 将显示驱动程序堆内存中的峰值,直到 GC。这是由于对象创建而不是数据读取造成的。 @Gsquare 这就是我的理解,但我在 select vs withColumn / foldLeft 上做了一些测试,看不到任何时间差异,因此没有真正的性能差异。我在 2.4 糟糕的社区数据块下尝试过,很好。 2.2 没有显示结果,但 UI 上的作业很好。你能在这里发表评论吗? @Fabich 你能评论一下吗? 我也刚刚在 Databricks 社区集群上设置了一个测试:` val columns = 1 to 100000 import spark.implicits._ import org.apache.spark.sql.functions._ val df = spark. range(100000) val df2 = df.select(columns.map(c => col("id").alias(s"column_c")):_*) val df3 = columns.foldLeft(df.toDF ) case (dataFrame, ind) => dataFrame.withColumn(s"column_ind", col("id")) ` df2 在大约 10 秒内完成评估,而 df3 现在已经运行了 10 多分钟。

以上是关于在 Spark Scala 中使用“withColumn”函数的替代方法的主要内容,如果未能解决你的问题,请参考以下文章

使用 Spark 和 Scala 进行字数统计

如何在 Windows 中使用 Scala 将 Cassandra 与 Spark 连接起来

如何使用 Scala 在 Spark 中进行滑动窗口排名?

如何在窗口 scala/spark 中使用 partitionBy 函数

在 Spark 上使用 Scala 在 Dataframe 中拆分字符串

如何使用 Scala 在 Spark 中爆炸嵌套结构