火花连接数据帧和合并模式
Posted
技术标签:
【中文标题】火花连接数据帧和合并模式【英文标题】:spark concatenate data frames and merge schema 【发布时间】:2017-04-26 10:52:41 【问题描述】:我在 spark 中有几个数据帧,开头部分相似的架构(标题)和最后不同的列(自定义)。
case class First(header1:String, header2:String, header3:Int, custom1:String)
case class Second(header1:String, header2:String, header3:Int, custom1:String, custom5:String)
case class Third(header1:String, header2:String, header3:Int, custom2:String, custom3:Int, custom4:Double)
val first = Seq(First("A", "Ba1", 1, "custom1"), First("A", "Ba2", 2, "custom2")).toDS
val second = Seq(Second("B", "Bb1", 1, "custom12", "custom5"), Second("B", "Bb2", 22, "custom12", "custom55")).toDS
val third = Seq(Third("A", "Bc1", 1, "custom2", 22, 44.4)).toDS
这可能看起来像:
+-------+-------+-------+-------+
|header1|header2|header3|custom1|
+-------+-------+-------+-------+
| A| Ba1| 1|custom1|
| A| Ba2| 2|custom2|
+-------+-------+-------+-------+
+-------+-------+-------+--------+--------+
|header1|header2|header3| custom1| custom5|
+-------+-------+-------+--------+--------+
| B| Bb1| 1|custom12| custom5|
| B| Bb2| 22|custom12|custom55|
+-------+-------+-------+--------+--------+
+-------+-------+-------+-------+-------+-------+
|header1|header2|header3|custom2|custom3|custom4|
+-------+-------+-------+-------+-------+-------+
| A| Bc1| 1|custom2| 22| 44.4|
+-------+-------+-------+-------+-------+-------+
如何合并架构以将所有数据帧基本上连接到一个架构中
case class All(header1:String, header2:String, header3:Int, custom1:Option[String], custom3:Option[String],
custom4: Option[Double], custom5:Option[String], type:String)
哪些不存在的列可以为空?
如果数据框中的第一条记录命名为 first,则输出应如下所示
+-------+-------+-------+-------+-------+-------+-------+-------+
|header1|header2|header3|custom1|custom2|custom3|custom4|custom5|
+-------+-------+-------+-------+-------+-------+-------+-------+
| A| B| 1|custom1|Nan |Nan | Nan| Nan. |
+-------+-------+-------+-------+-------+-------+-------+-------+
我正在考虑通过标题列连接数据框,但是,只有一些(比如说 header1)会保存相同的(实际上可连接的)值,而其他的(header2,3)会保存不同的值,即
first
.join(second, Seq("header1", "header2", "header3"), "LEFT")
.join(third, Seq("header1", "header2", "header3"), "LEFT")
.show
导致
+-------+-------+-------+-------+-------+-------+-------+-------+-------+
|header1|header2|header3|custom1|custom1|custom5|custom2|custom3|custom4|
+-------+-------+-------+-------+-------+-------+-------+-------+-------+
| A| Ba1| 1|custom1| null| null| null| null| null|
| A| Ba2| 2|custom2| null| null| null| null| null|
+-------+-------+-------+-------+-------+-------+-------+-------+-------+
不正确,因为我只想pd.Concat(axis=0)
数据帧,即缺少大部分记录。
此外,它还缺少标识原始数据框的type
列,即first, second, third
编辑
我认为经典的全外连接是解决方案
first
.join(second, Seq("header1", "header2", "header3"), "fullouter")
.join(third, Seq("header1", "header2", "header3"), "fullouter")
.show
产量:
+-------+-------+-------+-------+--------+--------+-------+-------+-------+
|header1|header2|header3|custom1| custom1| custom5|custom2|custom3|custom4|
+-------+-------+-------+-------+--------+--------+-------+-------+-------+
| A| Ba1| 1|custom1| null| null| null| null| null|
| A| Ba2| 2|custom2| null| null| null| null| null|
| A| Bb1| 1| null|custom12| custom5| null| null| null|
| A| Bb2| 22| null|custom12|custom55| null| null| null|
| A| Bc1| 1| null| null| null|custom2| 22| 44.4|
+-------+-------+-------+-------+--------+--------+-------+-------+-------+
如您所见,实际上永远不会有真正的连接,行是串联的。是否有更简单的操作来实现相同的功能?
这个答案不是最优的,因为custom1
是一个重复的名称。我宁愿看到一个 custom1
列(如果有第二个要填充,则没有空值)。
【问题讨论】:
【参考方案1】:查看我的comment to similar question。基本上你需要联合所有的框架。要制作类似的架构,您需要使用dataframe.withColumn(ColumnName, expr("null"))
表达式:
import org.apache.spark.sql.functions._
val first1 = first.withColumn("custom5", expr("null"))
.withColumn("custom4", expr("null"))
val second2 = second.withColumn("custom4", expr("null"))
val result = first1.unionAll(second2).unionAll(third)
【讨论】:
虽然 union all 在 2.1 中似乎已被弃用用于数据集。【参考方案2】:如果能提供所需的结果,请测试 SQL Union 方法。
SELECT header1,
header2,
header3,
custom1,
To_char(NULL) "custom2",
To_char(NULL) "custom3",
To_number(NULL) "custom4",
To_char(NULL) "custom5"
FROM table1
UNION
SELECT header1,
header2,
header3,
custom1,
To_char(NULL) "custom2",
To_char(NULL) "custom3",
To_number(NULL) "custom4",
custom5
FROM table2
UNION
SELECT header1,
header2,
header3,
To_char(NULL) "custom1",
custom2,
custom3,
custom4,
To_char(NULL) "custom5"
FROM table3;
【讨论】:
【参考方案3】:如果您将文件写入 HDFS,那么您可以通过将以下属性 Spark.sql.parquet.mergeSchema
设置为 TRUE 并将文件写入 HDFS 位置来实现。
它会自动更新架构并返回所有列。
您可以通过以下方式实现此目的
-
withColumn 和 union
在自身之前指定架构并执行联合
spark.conf.set("spark.sql.parquet.mergeSchema","true")
eb = spark.read.format("csv").schema(schem).option("path","/retail/ebay.csv").load()
eb.printSchema()
eb.write.format("parquet").mode("append").save("/retail/parquet_test")
from pyspark.sql.functions import lit
eb1 = eb.withColumn("dummy",lit(35))
eb1.printSchema()
eb1.write.format("parquet").mode("append").save("/retail/parquet_test")
eb2 = spark.read.parquet("/srinchin/parquet_test")
eb2.printSchema()
【讨论】:
以上是关于火花连接数据帧和合并模式的主要内容,如果未能解决你的问题,请参考以下文章