如何动态地将列添加到 DataFrame?

Posted

技术标签:

【中文标题】如何动态地将列添加到 DataFrame?【英文标题】:How to dynamically add columns to a DataFrame? 【发布时间】:2020-01-20 11:44:46 【问题描述】:

我正在尝试从 String 的 Seq 动态地将列添加到 DataFrame。

这是一个例子:源数据框是这样的:

+-----+---+----+---+---+
|id | A | B  | C  | D  |
+-----+---+----+---+---+
|1 |toto|tata|titi|    |
|2 |bla |blo |   |     |
|3 |b   | c  |  a |  d |
+-----+---+----+---+---+

我还有一个字符串序列,其中包含我要添加的列的名称。如果源 DataFrame 中已经存在一个列,它必须做一些如下的区别:

序列看起来像:

val columns = Seq("A", "B", "F", "G", "H")

期望是:

+-----+---+----+---+---+---+---+---+
|id | A | B  | C  | D  | F | G | H |
+-----+---+----+---+---+---+---+---+
|1 |toto|tata|titi|tutu|null|null|null
|2 |bla |blo |   |     |null|null|null|
|3 |b   | c  |  a |  d |null|null|null|
+-----+---+----+---+---+---+---+---+

到目前为止我所做的是这样的:

val difference = columns diff sourceDF.columns
val finalDF = difference.foldLeft(sourceDF)((df, field) => if (!sourceDF.columns.contains(field)) df.withColumn(field, lit(null))) else df)
  .select(columns.head, columns.tail:_*) 

但我不知道如何以更简单、更容易阅读的方式有效地使用 Spark...

提前致谢

【问题讨论】:

如何使用difference.foreach(x->df=df.withColumn(x,lit(null))) 【参考方案1】:

这是使用Seq.diff、单个selectmap 生成最终列列表的另一种方式:

import org.apache.spark.sql.functions.lit, col


val newCols = Seq("A", "B", "F", "G", "H")

val updatedCols = newCols.diff(df.columns).map c => lit(null).as(c)

val selectExpr = df.columns.map(col) ++ updatedCols

df.select(selectExpr:_*).show

// +---+----+----+----+----+----+----+----+
// | id|   A|   B|   C|   D|   F|   G|   H|
// +---+----+----+----+----+----+----+----+
// |  1|toto|tata|titi|null|null|null|null|
// |  2| bla| blo|null|null|null|null|null|
// |  3|   b|   c|   a|   d|null|null|null|
// +---+----+----+----+----+----+----+----+

首先我们找到 newCols 和 df.columns 之间的差异,这给了我们:F, G, H。接下来,我们通过map 函数将列表的每个元素转换为lit(null).as(c)。最后,我们将现有列表和新列表连接在一起生成selectExpr,用于select

【讨论】:

请注意,默认情况下 Spark 使用不区分大小写的列命名。一个简单的Seq.diff 将无法区分被 Spark 视为相同但大小写不同的列名。【参考方案2】:

下面将根据您的逻辑进行优化。

scala> df.show
+---+----+----+----+----+
| id|   A|   B|   C|   D|
+---+----+----+----+----+
|  1|toto|tata|titi|null|
|  2| bla| blo|null|null|
|  3|   b|   c|   a|   d|
+---+----+----+----+----+

scala> val Columns  = Seq("A", "B", "F", "G", "H")

scala> val newCol =  Columns filterNot df.columns.toSeq.contains

scala> val df1 =  newCol.foldLeft(df)((df,name) => df.withColumn(name, lit(null)))
scala> df1.show()
+---+----+----+----+----+----+----+----+
| id|   A|   B|   C|   D|   F|   G|   H|
+---+----+----+----+----+----+----+----+
|  1|toto|tata|titi|null|null|null|null|
|  2| bla| blo|null|null|null|null|null|
|  3|   b|   c|   a|   d|null|null|null|
+---+----+----+----+----+----+----+----+

如果您不想使用 foldLeft,那么您可以使用 RunTimeMirror,它会更快。检查下面的代码。

scala> import scala.reflect.runtime.universe.runtimeMirror
scala> import scala.tools.reflect.ToolBox
scala> import org.apache.spark.sql.DataFrame

scala> df.show
+---+----+----+----+----+
| id|   A|   B|   C|   D|
+---+----+----+----+----+
|  1|toto|tata|titi|null|
|  2| bla| blo|null|null|
|  3|   b|   c|   a|   d|
+---+----+----+----+----+


scala> def compile[A](code: String): DataFrame => A = 
     |     val tb = runtimeMirror(getClass.getClassLoader).mkToolBox()
     |     val tree = tb.parse(
     |       s"""
     |          |import org.elasticsearch.spark.sql._
     |          |import org.apache.spark.sql.DataFrame
     |          |def wrapper(context:DataFrame): Any = 
     |          |  $code
     |          |
     |          |wrapper _
     |       """.stripMargin)
     | 
     |     val fun = tb.compile(tree)
     |     val wrapper = fun()
     |     wrapper.asInstanceOf[DataFrame => A]
     |   


scala> def  AddColumns(df:DataFrame,withColumnsString:String):DataFrame = 
     |     val code =
     |       s"""
     |          |import org.apache.spark.sql.functions._
     |          |import org.elasticsearch.spark.sql._
     |          |import org.apache.spark.sql.DataFrame
     |          |var data = context.asInstanceOf[DataFrame]
     |          |data = data
     |       """ + withColumnsString +
     |         """
     |           |
     |           |data
     |         """.stripMargin
     | 
     |     val fun = compile[DataFrame](code) 
     |     val res = fun(df)
     |     res
     |   


scala> val Columns = Seq("A", "B", "F", "G", "H")     
scala> val newCol =  Columns filterNot df.columns.toSeq.contains

scala> var cols = ""      
scala>  newCol.foreach name =>
     |  cols = ".withColumn(\""+ name + "\" , lit(null))" + cols
     | 

scala> val df1 = AddColumns(df,cols)
scala> df1.show
+---+----+----+----+----+----+----+----+
| id|   A|   B|   C|   D|   H|   G|   F|
+---+----+----+----+----+----+----+----+
|  1|toto|tata|titi|null|null|null|null|
|  2| bla| blo|null|null|null|null|null|
|  3|   b|   c|   a|   d|null|null|null|
+---+----+----+----+----+----+----+----+

【讨论】:

有没有办法摆脱 foldLeft 功能?根据我的火花历史服务器,这个功能减慢了我的工作,原因我不明白:/ 如果你不想使用那么你可以使用运行时镜像并执行所有withColumns,Ref:programcreek.com/scala/scala.reflect.runtime.universe foldLeft() 减速的可能原因是每次迭代都会执行完整的计划分析。重构为 df.select(col("*") +: newCols.map(name => lit(null).as(name)): _*) 以进行单次分析。 @sim 是的,我同意你的看法。这就是我只使用 withColumn 和 df 的原因,我猜这不会花费时间。name 不在df.columns 中时,df.withColumn(name, c) 等效于df.select(col("*"), c.as(name))。因此,在可能的情况下(添加的列相互独立),使用单个 select 总是比使用多个 withColumn 更快,无论它们是如何发出的:硬编码、循环等.

以上是关于如何动态地将列添加到 DataFrame?的主要内容,如果未能解决你的问题,请参考以下文章

使用 Pandas 迭代地将列添加到数据框中

从另一个 DataFrame 将列添加到 Pyspark DataFrame

如何使用 DB2 sql 代码动态地将列转置为表的行,其中列可能会随着时间的推移而增加且无需更改代码?

更快地遍历一个 DataFrame 的行以将列添加到第二个 DataFrame

mysql - 动态地将列拆分为行

如何动态地将数组添加到 react-bootstrap 表中?