如何在 Spark 数据框中使用嵌套列进行连接
Posted
技术标签:
【中文标题】如何在 Spark 数据框中使用嵌套列进行连接【英文标题】:How to join using a nested column in Spark dataframe 【发布时间】:2019-08-21 16:12:48 【问题描述】:我有一个具有此架构的数据框:
|-- Activity_A1: string (nullable = true)
|-- Activity_A2: string (nullable = true)
|-- Details: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- Agreement_A1: string (nullable = true)
| | |-- Agreement_A2: string (nullable = true)
数据:
+-----------+-----------+--------------------------------------------------+
|Activity_A1|Activity_A2|Details |
+-----------+-----------+--------------------------------------------------+
|Act1_Attr1 |Act1_Attr2 |[[Agr2_Attr1,Agr2_Attr2], [Agr1_Attr1,Agr1_Attr2]]|
|Act2_Attr1 |Act2_Attr2 |[[Agr4_Attr1,Agr4_Attr2], [Agr3_Attr1,Agr3_Attr2]]|
|Act3_Attr1 |Act3_Attr2 |[[Agr5_Attr1,Agr5_Attr2]] |
+-----------+-----------+--------------------------------------------------+
第二个使用这个架构:
|-- Agreement_A1: string (nullable = true)
| | |-- Lines: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- Line_A1: string (nullable = true)
| | | | |-- Line_A2: string (nullable = true)
如何将这两个数据框与 Agreement_A1 列连接起来,所以这个新数据框的架构如下所示:
|-- Activity_A1: string (nullable = true)
|-- Activity_A2: string (nullable = true)
|-- Details: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- Agreement_A1: string (nullable = true)
| | |-- Agreement_A2: string (nullable = true)
| | |-- Lines: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- Line_A1: string (nullable = true)
| | | | |-- Line_A2: string (nullable = true)
【问题讨论】:
【参考方案1】:希望这会有所帮助。您需要取消嵌套(分解)“详细信息”并使用您的第二个数据框加入“Agreement_A1”。然后,根据需要构建列。
scala> df1.show(false)
+-----------+-----------+----------------------------------------------------+
|Activity_A1|Activity_A2|Details |
+-----------+-----------+----------------------------------------------------+
|Act1_Attr1 |Act1_Attr2 |[[Agr2_Attr1, Agr2_Attr2], [Agr1_Attr1, Agr1_Attr2]]|
|Act2_Attr1 |Act2_Attr2 |[[Agr4_Attr1, Agr4_Attr2], [Agr3_Attr1, Agr3_Attr2]]|
|Act3_Attr1 |Act3_Attr2 |[[Agr5_Attr1, Agr5_Attr2]] |
+-----------+-----------+----------------------------------------------------+
scala> df1.printSchema
root
|-- Activity_A1: string (nullable = true)
|-- Activity_A2: string (nullable = true)
|-- Details: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- Agreement_A1: string (nullable = true)
| | |-- Agreement_A2: string (nullable = true)
scala> df2.show(false)
+------------+--------------------------+
|Agreement_A1|Lines |
+------------+--------------------------+
|Agr1_Attr1 |[[A1At1Line1, A1At1Line2]]|
|Agr3_Attr1 |[[A3At1Line1, A3At1Line2]]|
|Agr4_Attr1 |[[A4At1Line1, A4At1Line2]]|
|Agr5_Attr1 |[[A5At1Line1, A5At1Line2]]|
|Agr6_Attr1 |[[A6At1Line1, A6At1Line2]]|
+------------+--------------------------+
scala> df2.printSchema
root
|-- Agreement_A1: string (nullable = true)
|-- Lines: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- Line_A1: string (nullable = true)
| | |-- Line_A2: string (nullable = true)
scala> val outputDF = df1.withColumn("DetailsExploded", explode($"Details")).join(
| df2, $"DetailsExploded.Agreement_A1" === $"Agreement_A1").withColumn(
| "DetailsWithAgreementA1Lines", struct($"DetailsExploded.Agreement_A1" as "Agreement_A1", $"DetailsExploded.Agreement_A2" as "Agreement_A2", $"Lines"))
outputDF: org.apache.spark.sql.DataFrame = [Activity_A1: string, Activity_A2: string ... 5 more fields]
scala> outputDF.show(false)
+-----------+-----------+----------------------------------------------------+------------------------+------------+--------------------------+----------------------------------------------------+
|Activity_A1|Activity_A2|Details |DetailsExploded |Agreement_A1|Lines |DetailsWithAgreementA1Lines |
+-----------+-----------+----------------------------------------------------+------------------------+------------+--------------------------+----------------------------------------------------+
|Act1_Attr1 |Act1_Attr2 |[[Agr2_Attr1, Agr2_Attr2], [Agr1_Attr1, Agr1_Attr2]]|[Agr1_Attr1, Agr1_Attr2]|Agr1_Attr1 |[[A1At1Line1, A1At1Line2]]|[Agr1_Attr1, Agr1_Attr2, [[A1At1Line1, A1At1Line2]]]|
|Act2_Attr1 |Act2_Attr2 |[[Agr4_Attr1, Agr4_Attr2], [Agr3_Attr1, Agr3_Attr2]]|[Agr3_Attr1, Agr3_Attr2]|Agr3_Attr1 |[[A3At1Line1, A3At1Line2]]|[Agr3_Attr1, Agr3_Attr2, [[A3At1Line1, A3At1Line2]]]|
|Act2_Attr1 |Act2_Attr2 |[[Agr4_Attr1, Agr4_Attr2], [Agr3_Attr1, Agr3_Attr2]]|[Agr4_Attr1, Agr4_Attr2]|Agr4_Attr1 |[[A4At1Line1, A4At1Line2]]|[Agr4_Attr1, Agr4_Attr2, [[A4At1Line1, A4At1Line2]]]|
|Act3_Attr1 |Act3_Attr2 |[[Agr5_Attr1, Agr5_Attr2]] |[Agr5_Attr1, Agr5_Attr2]|Agr5_Attr1 |[[A5At1Line1, A5At1Line2]]|[Agr5_Attr1, Agr5_Attr2, [[A5At1Line1, A5At1Line2]]]|
+-----------+-----------+----------------------------------------------------+------------------------+------------+--------------------------+----------------------------------------------------+
scala> outputDF.printSchema
root
|-- Activity_A1: string (nullable = true)
|-- Activity_A2: string (nullable = true)
|-- Details: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- Agreement_A1: string (nullable = true)
| | |-- Agreement_A2: string (nullable = true)
|-- DetailsExploded: struct (nullable = true)
| |-- Agreement_A1: string (nullable = true)
| |-- Agreement_A2: string (nullable = true)
|-- Agreement_A1: string (nullable = true)
|-- Lines: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- Line_A1: string (nullable = true)
| | |-- Line_A2: string (nullable = true)
|-- DetailsWithAgreementA1Lines: struct (nullable = false)
| |-- Agreement_A1: string (nullable = true)
| |-- Agreement_A2: string (nullable = true)
| |-- Lines: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- Line_A1: string (nullable = true)
| | | |-- Line_A2: string (nullable = true)
scala> outputDF.groupBy("Activity_A1", "Activity_A2").agg(collect_list($"DetailsWithAgreementA1Lines") as "Details").show(false)
+-----------+-----------+------------------------------------------------------------------------------------------------------------+
|Activity_A1|Activity_A2|Details |
+-----------+-----------+------------------------------------------------------------------------------------------------------------+
|Act1_Attr1 |Act1_Attr2 |[[Agr1_Attr1, Agr1_Attr2, [[A1At1Line1, A1At1Line2]]]] |
|Act2_Attr1 |Act2_Attr2 |[[Agr3_Attr1, Agr3_Attr2, [[A3At1Line1, A3At1Line2]]], [Agr4_Attr1, Agr4_Attr2, [[A4At1Line1, A4At1Line2]]]]|
|Act3_Attr1 |Act3_Attr2 |[[Agr5_Attr1, Agr5_Attr2, [[A5At1Line1, A5At1Line2]]]] |
+-----------+-----------+------------------------------------------------------------------------------------------------------------+
scala> outputDF.groupBy("Activity_A1", "Activity_A2").agg(collect_list($"DetailsWithAgreementA1Lines") as "Details").printSchema
root
|-- Activity_A1: string (nullable = true)
|-- Activity_A2: string (nullable = true)
|-- Details: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- Agreement_A1: string (nullable = true)
| | |-- Agreement_A2: string (nullable = true)
| | |-- Lines: array (nullable = true)
| | | |-- element: struct (containsNull = true)
| | | | |-- Line_A1: string (nullable = true)
| | | | |-- Line_A2: string (nullable = true)
【讨论】:
以上是关于如何在 Spark 数据框中使用嵌套列进行连接的主要内容,如果未能解决你的问题,请参考以下文章
Spark - 如何使用列对数据框中的字符串进行切片[重复]
使用 spark-xml 从 pyspark 数据框中选择嵌套列