Spark SQL:如何将新行附加到数据框表(来自另一个表)

Posted

技术标签:

【中文标题】Spark SQL:如何将新行附加到数据框表(来自另一个表)【英文标题】:Spark SQL: How to append new row to dataframe table (from another table) 【发布时间】:2016-08-23 22:29:33 【问题描述】:

我正在使用带有数据帧的 Spark SQL。我有一个输入数据框,我想将其行附加(或插入)到具有更多列的更大数据框中。我该怎么做?

如果这是 SQL,我会使用 INSERT INTO OUTPUT SELECT ... FROM INPUT,但我不知道如何使用 Spark SQL。

为了具体:

var input = sqlContext.createDataFrame(Seq(
        (10L, "Joe Doe", 34),
        (11L, "Jane Doe", 31),
        (12L, "Alice Jones", 25)
        )).toDF("id", "name", "age")

var output = sqlContext.createDataFrame(Seq(
        (0L, "Jack Smith", 41, "yes", 1459204800L),
        (1L, "Jane Jones", 22, "no", 1459294200L),
        (2L, "Alice Smith", 31, "", 1459595700L)
        )).toDF("id", "name", "age", "init", "ts")


scala> input.show()
+---+-----------+---+
| id|       name|age|
+---+-----------+---+
| 10|    Joe Doe| 34|
| 11|   Jane Doe| 31|
| 12|Alice Jones| 25|
+---+-----------+---+

scala> input.printSchema()
root
 |-- id: long (nullable = false)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = false)


scala> output.show()
+---+-----------+---+----+----------+
| id|       name|age|init|        ts|
+---+-----------+---+----+----------+
|  0| Jack Smith| 41| yes|1459204800|
|  1| Jane Jones| 22|  no|1459294200|
|  2|Alice Smith| 31|    |1459595700|
+---+-----------+---+----+----------+

scala> output.printSchema()
root
 |-- id: long (nullable = false)
 |-- name: string (nullable = true)
 |-- age: integer (nullable = false)
 |-- init: string (nullable = true)
 |-- ts: long (nullable = false)

我想将input 的所有行追加到output 的末尾。同时,我想将initoutput 列设置为空字符串'',将ts 列设置为当前时间戳,例如1461883875L.

任何帮助将不胜感激。

【问题讨论】:

顺便说一下,分布式数据没有“开始”也没有“结束”,你应该忘记索引。此外,在 Scala 中使用 vars 是一种不好的做法。 ts 在您显示它不可为空的架构中的默认值是什么 【参考方案1】:

Spark DataFrames 是不可变的,因此无法追加/插入行。相反,您可以添加缺失的列并使用UNION ALL:

output.unionAll(input.select($"*", lit(""), current_timestamp.cast("long")))

【讨论】:

这太棒了。谢谢。 可爱,我正准备回答output.unionAll(input.select(input.columns.map(col) ++ List(lit("").as("init"), lit(0L).as("ts")) :_*)).show,但你的回答显然更好 我也在考虑:var input2 = input.withColumn("init", lit(null:String)).withColumn("ts", current_timestamp.cast("long")) 然后做 unionAll()【参考方案2】:

我遇到了与您的 SQL 问题匹配的类似问题:

我想将数据框附加到现有的配置单元表中,该表也更大(更多列)。保留您的示例:output 是我现有的表,input 可能是数据框。我的解决方案只使用 SQL,为了完整起见,我想提供它:

import org.apache.spark.sql.SaveMode

var input = spark.createDataFrame(Seq(
        (10L, "Joe Doe", 34),
        (11L, "Jane Doe", 31),
        (12L, "Alice Jones", 25)
        )).toDF("id", "name", "age")

//--> just for a running example: In my case the table already exists
var output = spark.createDataFrame(Seq(
        (0L, "Jack Smith", 41, "yes", 1459204800L),
        (1L, "Jane Jones", 22, "no", 1459294200L),
        (2L, "Alice Smith", 31, "", 1459595700L)
        )).toDF("id", "name", "age", "init", "ts")

output.write.mode(SaveMode.Overwrite).saveAsTable("appendTest");
//<--

input.createOrReplaceTempView("inputTable");

spark.sql("INSERT INTO TABLE appendTest SELECT id, name, age, null, null FROM inputTable");
val df = spark.sql("SELECT * FROM appendTest")
df.show()

哪个输出:

+---+-----------+---+----+----------+
| id|       name|age|init|        ts|
+---+-----------+---+----+----------+
|  0| Jack Smith| 41| yes|1459204800|
|  1| Jane Jones| 22|  no|1459294200|
|  2|Alice Smith| 31|    |1459595700|
| 12|Alice Jones| 25|null|      null|
| 11|   Jane Doe| 31|null|      null|
| 10|    Joe Doe| 34|null|      null|
+---+-----------+---+----+----------+

如果您可能有问题,即您不知道缺少多少字段,您可以使用diff 之类的

val missingFields = output.schema.toSet.diff(input.schema.toSet)

然后(在错误的伪代码中)

val sqlQuery = "INSERT INTO TABLE appendTest SELECT " + commaSeparatedColumnNames + commaSeparatedNullsForEachMissingField + " FROM inputTable"

希望能帮助有类似问题的人!

P.S.:在您的特殊情况下(当前时间戳 + init 的空字段)您甚至可以使用

spark.sql("INSERT INTO TABLE appendTest SELECT id, name, age, '' as init, current_timestamp as ts FROM inputTable");

导致

+---+-----------+---+----+----------+
| id|       name|age|init|        ts|
+---+-----------+---+----+----------+
|  0| Jack Smith| 41| yes|1459204800|
|  1| Jane Jones| 22|  no|1459294200|
|  2|Alice Smith| 31|    |1459595700|
| 12|Alice Jones| 25|    |1521128513|
| 11|   Jane Doe| 31|    |1521128513|
| 10|    Joe Doe| 34|    |1521128513|
+---+-----------+---+----+----------+

【讨论】:

有没有不用SQL的办法

以上是关于Spark SQL:如何将新行附加到数据框表(来自另一个表)的主要内容,如果未能解决你的问题,请参考以下文章

如何通过 Synapse 的 Spark 池将数据帧数据附加到专用的 SQL 池中?

如何将列添加到 mapPartitions 内的 org.apache.spark.sql.Row

以角度将行附加到现有数据列表

如何使用 fast-csv npm 将新行或新行的数据(新行)附加到现有的 csv 文件

如何链接多个组合框表中的两个组合框?

如何将新行附加到熊猫中的数据框?