如何动态地将列添加到 DataFrame?
Posted
技术标签:
【中文标题】如何动态地将列添加到 DataFrame?【英文标题】:How to dynamically add columns to a DataFrame? 【发布时间】:2020-01-20 11:44:46 【问题描述】:我正在尝试从 String 的 Seq 动态地将列添加到 DataFrame。
这是一个例子:源数据框是这样的:
+-----+---+----+---+---+
|id | A | B | C | D |
+-----+---+----+---+---+
|1 |toto|tata|titi| |
|2 |bla |blo | | |
|3 |b | c | a | d |
+-----+---+----+---+---+
我还有一个字符串序列,其中包含我要添加的列的名称。如果源 DataFrame 中已经存在一个列,它必须做一些如下的区别:
序列看起来像:
val columns = Seq("A", "B", "F", "G", "H")
期望是:
+-----+---+----+---+---+---+---+---+
|id | A | B | C | D | F | G | H |
+-----+---+----+---+---+---+---+---+
|1 |toto|tata|titi|tutu|null|null|null
|2 |bla |blo | | |null|null|null|
|3 |b | c | a | d |null|null|null|
+-----+---+----+---+---+---+---+---+
到目前为止我所做的是这样的:
val difference = columns diff sourceDF.columns
val finalDF = difference.foldLeft(sourceDF)((df, field) => if (!sourceDF.columns.contains(field)) df.withColumn(field, lit(null))) else df)
.select(columns.head, columns.tail:_*)
但我不知道如何以更简单、更容易阅读的方式有效地使用 Spark...
提前致谢
【问题讨论】:
如何使用difference.foreach(x->df=df.withColumn(x,lit(null)))
【参考方案1】:
这是使用Seq.diff
、单个select
和map
生成最终列列表的另一种方式:
import org.apache.spark.sql.functions.lit, col
val newCols = Seq("A", "B", "F", "G", "H")
val updatedCols = newCols.diff(df.columns).map c => lit(null).as(c)
val selectExpr = df.columns.map(col) ++ updatedCols
df.select(selectExpr:_*).show
// +---+----+----+----+----+----+----+----+
// | id| A| B| C| D| F| G| H|
// +---+----+----+----+----+----+----+----+
// | 1|toto|tata|titi|null|null|null|null|
// | 2| bla| blo|null|null|null|null|null|
// | 3| b| c| a| d|null|null|null|
// +---+----+----+----+----+----+----+----+
首先我们找到 newCols 和 df.columns 之间的差异,这给了我们:F, G, H
。接下来,我们通过map
函数将列表的每个元素转换为lit(null).as(c)
。最后,我们将现有列表和新列表连接在一起生成selectExpr
,用于select
。
【讨论】:
请注意,默认情况下 Spark 使用不区分大小写的列命名。一个简单的Seq.diff
将无法区分被 Spark 视为相同但大小写不同的列名。【参考方案2】:
下面将根据您的逻辑进行优化。
scala> df.show
+---+----+----+----+----+
| id| A| B| C| D|
+---+----+----+----+----+
| 1|toto|tata|titi|null|
| 2| bla| blo|null|null|
| 3| b| c| a| d|
+---+----+----+----+----+
scala> val Columns = Seq("A", "B", "F", "G", "H")
scala> val newCol = Columns filterNot df.columns.toSeq.contains
scala> val df1 = newCol.foldLeft(df)((df,name) => df.withColumn(name, lit(null)))
scala> df1.show()
+---+----+----+----+----+----+----+----+
| id| A| B| C| D| F| G| H|
+---+----+----+----+----+----+----+----+
| 1|toto|tata|titi|null|null|null|null|
| 2| bla| blo|null|null|null|null|null|
| 3| b| c| a| d|null|null|null|
+---+----+----+----+----+----+----+----+
如果您不想使用 foldLeft,那么您可以使用 RunTimeMirror,它会更快。检查下面的代码。
scala> import scala.reflect.runtime.universe.runtimeMirror
scala> import scala.tools.reflect.ToolBox
scala> import org.apache.spark.sql.DataFrame
scala> df.show
+---+----+----+----+----+
| id| A| B| C| D|
+---+----+----+----+----+
| 1|toto|tata|titi|null|
| 2| bla| blo|null|null|
| 3| b| c| a| d|
+---+----+----+----+----+
scala> def compile[A](code: String): DataFrame => A =
| val tb = runtimeMirror(getClass.getClassLoader).mkToolBox()
| val tree = tb.parse(
| s"""
| |import org.elasticsearch.spark.sql._
| |import org.apache.spark.sql.DataFrame
| |def wrapper(context:DataFrame): Any =
| | $code
| |
| |wrapper _
| """.stripMargin)
|
| val fun = tb.compile(tree)
| val wrapper = fun()
| wrapper.asInstanceOf[DataFrame => A]
|
scala> def AddColumns(df:DataFrame,withColumnsString:String):DataFrame =
| val code =
| s"""
| |import org.apache.spark.sql.functions._
| |import org.elasticsearch.spark.sql._
| |import org.apache.spark.sql.DataFrame
| |var data = context.asInstanceOf[DataFrame]
| |data = data
| """ + withColumnsString +
| """
| |
| |data
| """.stripMargin
|
| val fun = compile[DataFrame](code)
| val res = fun(df)
| res
|
scala> val Columns = Seq("A", "B", "F", "G", "H")
scala> val newCol = Columns filterNot df.columns.toSeq.contains
scala> var cols = ""
scala> newCol.foreach name =>
| cols = ".withColumn(\""+ name + "\" , lit(null))" + cols
|
scala> val df1 = AddColumns(df,cols)
scala> df1.show
+---+----+----+----+----+----+----+----+
| id| A| B| C| D| H| G| F|
+---+----+----+----+----+----+----+----+
| 1|toto|tata|titi|null|null|null|null|
| 2| bla| blo|null|null|null|null|null|
| 3| b| c| a| d|null|null|null|
+---+----+----+----+----+----+----+----+
【讨论】:
有没有办法摆脱 foldLeft 功能?根据我的火花历史服务器,这个功能减慢了我的工作,原因我不明白:/ 如果你不想使用那么你可以使用运行时镜像并执行所有withColumns,Ref:programcreek.com/scala/scala.reflect.runtime.universefoldLeft()
减速的可能原因是每次迭代都会执行完整的计划分析。重构为 df.select(col("*") +: newCols.map(name => lit(null).as(name)): _*)
以进行单次分析。
@sim 是的,我同意你的看法。这就是我只使用 withColumn 和 df 的原因,我猜这不会花费时间。
当name
不在df.columns
中时,df.withColumn(name, c)
等效于df.select(col("*"), c.as(name))
。因此,在可能的情况下(添加的列相互独立),使用单个 select
总是比使用多个 withColumn
更快,无论它们是如何发出的:硬编码、循环等.以上是关于如何动态地将列添加到 DataFrame?的主要内容,如果未能解决你的问题,请参考以下文章
从另一个 DataFrame 将列添加到 Pyspark DataFrame
如何使用 DB2 sql 代码动态地将列转置为表的行,其中列可能会随着时间的推移而增加且无需更改代码?