火花连接数据帧和合并模式

Posted

技术标签:

【中文标题】火花连接数据帧和合并模式【英文标题】:spark concatenate data frames and merge schema 【发布时间】:2017-04-26 10:52:41 【问题描述】:

我在 spark 中有几个数据帧,开头部分相似的架构(标题)和最后不同的列(自定义)。

case class First(header1:String, header2:String, header3:Int, custom1:String)
case class Second(header1:String, header2:String, header3:Int, custom1:String, custom5:String)
case class Third(header1:String, header2:String, header3:Int, custom2:String, custom3:Int, custom4:Double)

val first = Seq(First("A", "Ba1", 1, "custom1"), First("A", "Ba2", 2, "custom2")).toDS
val second = Seq(Second("B", "Bb1", 1, "custom12", "custom5"), Second("B", "Bb2", 22, "custom12", "custom55")).toDS
val third = Seq(Third("A", "Bc1", 1, "custom2", 22, 44.4)).toDS

这可能看起来像:

+-------+-------+-------+-------+
|header1|header2|header3|custom1|
+-------+-------+-------+-------+
|      A|    Ba1|      1|custom1|
|      A|    Ba2|      2|custom2|
+-------+-------+-------+-------+


+-------+-------+-------+--------+--------+
|header1|header2|header3| custom1| custom5|
+-------+-------+-------+--------+--------+
|      B|    Bb1|      1|custom12| custom5|
|      B|    Bb2|     22|custom12|custom55|
+-------+-------+-------+--------+--------+


+-------+-------+-------+-------+-------+-------+
|header1|header2|header3|custom2|custom3|custom4|
+-------+-------+-------+-------+-------+-------+
|      A|    Bc1|      1|custom2|     22|   44.4|
+-------+-------+-------+-------+-------+-------+

如何合并架构以将所有数据帧基本上连接到一个架构中

case class All(header1:String, header2:String, header3:Int, custom1:Option[String], custom3:Option[String],
                custom4: Option[Double], custom5:Option[String], type:String)

哪些不存在的列可以为空?

如果数据框中的第一条记录命名为 first,则输出应如下所示

+-------+-------+-------+-------+-------+-------+-------+-------+
|header1|header2|header3|custom1|custom2|custom3|custom4|custom5|
+-------+-------+-------+-------+-------+-------+-------+-------+
|      A|      B|      1|custom1|Nan    |Nan    |    Nan|  Nan. |
+-------+-------+-------+-------+-------+-------+-------+-------+

我正在考虑通过标题列连接数据框,但是,只有一些(比如说 header1)会保存相同的(实际上可连接的)值,而其他的(header2,3)会保存不同的值,即

first
    .join(second, Seq("header1", "header2", "header3"), "LEFT")
    .join(third, Seq("header1", "header2", "header3"), "LEFT")
  .show

导致

+-------+-------+-------+-------+-------+-------+-------+-------+-------+
|header1|header2|header3|custom1|custom1|custom5|custom2|custom3|custom4|
+-------+-------+-------+-------+-------+-------+-------+-------+-------+
|      A|    Ba1|      1|custom1|   null|   null|   null|   null|   null|
|      A|    Ba2|      2|custom2|   null|   null|   null|   null|   null|
+-------+-------+-------+-------+-------+-------+-------+-------+-------+

不正确,因为我只想pd.Concat(axis=0) 数据帧,即缺少大部分记录。 此外,它还缺少标识原始数据框的type 列,即first, second, third

编辑

我认为经典的全外连接是解决方案

first
    .join(second, Seq("header1", "header2", "header3"), "fullouter")
    .join(third, Seq("header1", "header2", "header3"), "fullouter")
  .show

产量:

+-------+-------+-------+-------+--------+--------+-------+-------+-------+
|header1|header2|header3|custom1| custom1| custom5|custom2|custom3|custom4|
+-------+-------+-------+-------+--------+--------+-------+-------+-------+
|      A|    Ba1|      1|custom1|    null|    null|   null|   null|   null|
|      A|    Ba2|      2|custom2|    null|    null|   null|   null|   null|
|      A|    Bb1|      1|   null|custom12| custom5|   null|   null|   null|
|      A|    Bb2|     22|   null|custom12|custom55|   null|   null|   null|
|      A|    Bc1|      1|   null|    null|    null|custom2|     22|   44.4|
+-------+-------+-------+-------+--------+--------+-------+-------+-------+

如您所见,实际上永远不会有真正的连接,行是串联的。是否有更简单的操作来实现相同的功能? 这个答案不是最优的,因为custom1 是一个重复的名称。我宁愿看到一个 custom1 列(如果有第二个要填充,则没有空值)。

【问题讨论】:

【参考方案1】:

查看我的comment to similar question。基本上你需要联合所有的框架。要制作类似的架构,您需要使用dataframe.withColumn(ColumnName, expr("null")) 表达式:

import org.apache.spark.sql.functions._  
val first1 = first.withColumn("custom5", expr("null"))
                  .withColumn("custom4", expr("null"))
val second2 = second.withColumn("custom4", expr("null"))
val result = first1.unionAll(second2).unionAll(third)

【讨论】:

虽然 union all 在 2.1 中似乎已被弃用用于数据集。【参考方案2】:

如果能提供所需的结果,请测试 SQL Union 方法。

SELECT header1,
       header2,
       header3,
       custom1,
       To_char(NULL)   "custom2",
       To_char(NULL)   "custom3",
       To_number(NULL) "custom4",
       To_char(NULL)   "custom5"
FROM   table1
UNION
SELECT header1,
       header2,
       header3,
       custom1,
       To_char(NULL)   "custom2",
       To_char(NULL)   "custom3",
       To_number(NULL) "custom4",
       custom5
FROM   table2
UNION
SELECT header1,
       header2,
       header3,
       To_char(NULL) "custom1",
       custom2,
       custom3,
       custom4,
       To_char(NULL) "custom5"
FROM   table3;

【讨论】:

【参考方案3】:

如果您将文件写入 HDFS,那么您可以通过将以下属性 Spark.sql.parquet.mergeSchema 设置为 TRUE 并将文件写入 HDFS 位置来实现。

它会自动更新架构并返回所有列。

您可以通过以下方式实现此目的

    withColumn 和 union 在自身之前指定架构并执行联合
spark.conf.set("spark.sql.parquet.mergeSchema","true")
eb = spark.read.format("csv").schema(schem).option("path","/retail/ebay.csv").load()
eb.printSchema()
eb.write.format("parquet").mode("append").save("/retail/parquet_test")

from pyspark.sql.functions import lit

eb1 = eb.withColumn("dummy",lit(35))
eb1.printSchema()


eb1.write.format("parquet").mode("append").save("/retail/parquet_test")

eb2 = spark.read.parquet("/srinchin/parquet_test")
eb2.printSchema()

【讨论】:

以上是关于火花连接数据帧和合并模式的主要内容,如果未能解决你的问题,请参考以下文章

合并,合并火花数据框中的 2 列

如何在火花中合并两个不同的数据帧? [复制]

火花重新分区和合并

连接单独处理的火花数据帧的两列时的顺序保证是啥?

MySQL数据库联合查询与连接查询

火花数据集的转换