Spark SQL:如何将新行附加到数据框表(来自另一个表)
Posted
技术标签:
【中文标题】Spark SQL:如何将新行附加到数据框表(来自另一个表)【英文标题】:Spark SQL: How to append new row to dataframe table (from another table) 【发布时间】:2016-08-23 22:29:33 【问题描述】:我正在使用带有数据帧的 Spark SQL。我有一个输入数据框,我想将其行附加(或插入)到具有更多列的更大数据框中。我该怎么做?
如果这是 SQL,我会使用 INSERT INTO OUTPUT SELECT ... FROM INPUT
,但我不知道如何使用 Spark SQL。
为了具体:
var input = sqlContext.createDataFrame(Seq(
(10L, "Joe Doe", 34),
(11L, "Jane Doe", 31),
(12L, "Alice Jones", 25)
)).toDF("id", "name", "age")
var output = sqlContext.createDataFrame(Seq(
(0L, "Jack Smith", 41, "yes", 1459204800L),
(1L, "Jane Jones", 22, "no", 1459294200L),
(2L, "Alice Smith", 31, "", 1459595700L)
)).toDF("id", "name", "age", "init", "ts")
scala> input.show()
+---+-----------+---+
| id| name|age|
+---+-----------+---+
| 10| Joe Doe| 34|
| 11| Jane Doe| 31|
| 12|Alice Jones| 25|
+---+-----------+---+
scala> input.printSchema()
root
|-- id: long (nullable = false)
|-- name: string (nullable = true)
|-- age: integer (nullable = false)
scala> output.show()
+---+-----------+---+----+----------+
| id| name|age|init| ts|
+---+-----------+---+----+----------+
| 0| Jack Smith| 41| yes|1459204800|
| 1| Jane Jones| 22| no|1459294200|
| 2|Alice Smith| 31| |1459595700|
+---+-----------+---+----+----------+
scala> output.printSchema()
root
|-- id: long (nullable = false)
|-- name: string (nullable = true)
|-- age: integer (nullable = false)
|-- init: string (nullable = true)
|-- ts: long (nullable = false)
我想将input
的所有行追加到output
的末尾。同时,我想将init
的output
列设置为空字符串''
,将ts
列设置为当前时间戳,例如1461883875L.
任何帮助将不胜感激。
【问题讨论】:
顺便说一下,分布式数据没有“开始”也没有“结束”,你应该忘记索引。此外,在 Scala 中使用var
s 是一种不好的做法。 ts
在您显示它不可为空的架构中的默认值是什么
【参考方案1】:
Spark DataFrames
是不可变的,因此无法追加/插入行。相反,您可以添加缺失的列并使用UNION ALL
:
output.unionAll(input.select($"*", lit(""), current_timestamp.cast("long")))
【讨论】:
这太棒了。谢谢。 可爱,我正准备回答output.unionAll(input.select(input.columns.map(col) ++ List(lit("").as("init"), lit(0L).as("ts")) :_*)).show
,但你的回答显然更好
我也在考虑:var input2 = input.withColumn("init", lit(null:String)).withColumn("ts", current_timestamp.cast("long"))
然后做 unionAll()【参考方案2】:
我遇到了与您的 SQL 问题匹配的类似问题:
我想将数据框附加到现有的配置单元表中,该表也更大(更多列)。保留您的示例:output
是我现有的表,input
可能是数据框。我的解决方案只使用 SQL,为了完整起见,我想提供它:
import org.apache.spark.sql.SaveMode
var input = spark.createDataFrame(Seq(
(10L, "Joe Doe", 34),
(11L, "Jane Doe", 31),
(12L, "Alice Jones", 25)
)).toDF("id", "name", "age")
//--> just for a running example: In my case the table already exists
var output = spark.createDataFrame(Seq(
(0L, "Jack Smith", 41, "yes", 1459204800L),
(1L, "Jane Jones", 22, "no", 1459294200L),
(2L, "Alice Smith", 31, "", 1459595700L)
)).toDF("id", "name", "age", "init", "ts")
output.write.mode(SaveMode.Overwrite).saveAsTable("appendTest");
//<--
input.createOrReplaceTempView("inputTable");
spark.sql("INSERT INTO TABLE appendTest SELECT id, name, age, null, null FROM inputTable");
val df = spark.sql("SELECT * FROM appendTest")
df.show()
哪个输出:
+---+-----------+---+----+----------+
| id| name|age|init| ts|
+---+-----------+---+----+----------+
| 0| Jack Smith| 41| yes|1459204800|
| 1| Jane Jones| 22| no|1459294200|
| 2|Alice Smith| 31| |1459595700|
| 12|Alice Jones| 25|null| null|
| 11| Jane Doe| 31|null| null|
| 10| Joe Doe| 34|null| null|
+---+-----------+---+----+----------+
如果您可能有问题,即您不知道缺少多少字段,您可以使用diff
之类的
val missingFields = output.schema.toSet.diff(input.schema.toSet)
然后(在错误的伪代码中)
val sqlQuery = "INSERT INTO TABLE appendTest SELECT " + commaSeparatedColumnNames + commaSeparatedNullsForEachMissingField + " FROM inputTable"
希望能帮助有类似问题的人!
P.S.:在您的特殊情况下(当前时间戳 + init 的空字段)您甚至可以使用
spark.sql("INSERT INTO TABLE appendTest SELECT id, name, age, '' as init, current_timestamp as ts FROM inputTable");
导致
+---+-----------+---+----+----------+
| id| name|age|init| ts|
+---+-----------+---+----+----------+
| 0| Jack Smith| 41| yes|1459204800|
| 1| Jane Jones| 22| no|1459294200|
| 2|Alice Smith| 31| |1459595700|
| 12|Alice Jones| 25| |1521128513|
| 11| Jane Doe| 31| |1521128513|
| 10| Joe Doe| 34| |1521128513|
+---+-----------+---+----+----------+
【讨论】:
有没有不用SQL的办法以上是关于Spark SQL:如何将新行附加到数据框表(来自另一个表)的主要内容,如果未能解决你的问题,请参考以下文章
如何通过 Synapse 的 Spark 池将数据帧数据附加到专用的 SQL 池中?
如何将列添加到 mapPartitions 内的 org.apache.spark.sql.Row