如何在 Spark 数据框中使用嵌套列进行连接

Posted

技术标签:

【中文标题】如何在 Spark 数据框中使用嵌套列进行连接【英文标题】:How to join using a nested column in Spark dataframe 【发布时间】:2019-08-21 16:12:48 【问题描述】:

我有一个具有此架构的数据框:

|-- Activity_A1: string (nullable = true)
|-- Activity_A2: string (nullable = true)
|-- Details: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- Agreement_A1: string (nullable = true)
|    |    |-- Agreement_A2: string (nullable = true)

数据:

+-----------+-----------+--------------------------------------------------+
|Activity_A1|Activity_A2|Details                                           |
+-----------+-----------+--------------------------------------------------+
|Act1_Attr1 |Act1_Attr2 |[[Agr2_Attr1,Agr2_Attr2], [Agr1_Attr1,Agr1_Attr2]]|
|Act2_Attr1 |Act2_Attr2 |[[Agr4_Attr1,Agr4_Attr2], [Agr3_Attr1,Agr3_Attr2]]|
|Act3_Attr1 |Act3_Attr2 |[[Agr5_Attr1,Agr5_Attr2]]                         |
+-----------+-----------+--------------------------------------------------+

第二个使用这个架构:

|-- Agreement_A1: string (nullable = true)
|    |    |-- Lines: array (nullable = true)
|    |    |    |-- element: struct (containsNull = true)
|    |    |    |    |-- Line_A1: string (nullable = true)
|    |    |    |    |-- Line_A2: string (nullable = true)

如何将这两个数据框与 Agreement_A1 列连接起来,所以这个新数据框的架构如下所示:

|-- Activity_A1: string (nullable = true)
|-- Activity_A2: string (nullable = true)
|-- Details: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- Agreement_A1: string (nullable = true)
|    |    |-- Agreement_A2: string (nullable = true)
|    |    |-- Lines: array (nullable = true)
|    |    |    |-- element: struct (containsNull = true)
|    |    |    |    |-- Line_A1: string (nullable = true)
|    |    |    |    |-- Line_A2: string (nullable = true)

【问题讨论】:

【参考方案1】:

希望这会有所帮助。您需要取消嵌套(分解)“详细信息”并使用您的第二个数据框加入“Agreement_A1”。然后,根据需要构建列。

scala> df1.show(false)
+-----------+-----------+----------------------------------------------------+
|Activity_A1|Activity_A2|Details                                             |
+-----------+-----------+----------------------------------------------------+
|Act1_Attr1 |Act1_Attr2 |[[Agr2_Attr1, Agr2_Attr2], [Agr1_Attr1, Agr1_Attr2]]|
|Act2_Attr1 |Act2_Attr2 |[[Agr4_Attr1, Agr4_Attr2], [Agr3_Attr1, Agr3_Attr2]]|
|Act3_Attr1 |Act3_Attr2 |[[Agr5_Attr1, Agr5_Attr2]]                          |
+-----------+-----------+----------------------------------------------------+


scala> df1.printSchema
root
|-- Activity_A1: string (nullable = true)
|-- Activity_A2: string (nullable = true)
|-- Details: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- Agreement_A1: string (nullable = true)
|    |    |-- Agreement_A2: string (nullable = true)


scala> df2.show(false)
+------------+--------------------------+
|Agreement_A1|Lines                     |
+------------+--------------------------+
|Agr1_Attr1  |[[A1At1Line1, A1At1Line2]]|
|Agr3_Attr1  |[[A3At1Line1, A3At1Line2]]|
|Agr4_Attr1  |[[A4At1Line1, A4At1Line2]]|
|Agr5_Attr1  |[[A5At1Line1, A5At1Line2]]|
|Agr6_Attr1  |[[A6At1Line1, A6At1Line2]]|
+------------+--------------------------+


scala> df2.printSchema
root
|-- Agreement_A1: string (nullable = true)
|-- Lines: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- Line_A1: string (nullable = true)
|    |    |-- Line_A2: string (nullable = true)


scala> val outputDF = df1.withColumn("DetailsExploded", explode($"Details")).join(
    |   df2, $"DetailsExploded.Agreement_A1" === $"Agreement_A1").withColumn(
    |     "DetailsWithAgreementA1Lines", struct($"DetailsExploded.Agreement_A1" as "Agreement_A1", $"DetailsExploded.Agreement_A2" as "Agreement_A2", $"Lines"))
outputDF: org.apache.spark.sql.DataFrame = [Activity_A1: string, Activity_A2: string ... 5 more fields]

scala> outputDF.show(false)
+-----------+-----------+----------------------------------------------------+------------------------+------------+--------------------------+----------------------------------------------------+
|Activity_A1|Activity_A2|Details                                             |DetailsExploded         |Agreement_A1|Lines                     |DetailsWithAgreementA1Lines                         |
+-----------+-----------+----------------------------------------------------+------------------------+------------+--------------------------+----------------------------------------------------+
|Act1_Attr1 |Act1_Attr2 |[[Agr2_Attr1, Agr2_Attr2], [Agr1_Attr1, Agr1_Attr2]]|[Agr1_Attr1, Agr1_Attr2]|Agr1_Attr1  |[[A1At1Line1, A1At1Line2]]|[Agr1_Attr1, Agr1_Attr2, [[A1At1Line1, A1At1Line2]]]|
|Act2_Attr1 |Act2_Attr2 |[[Agr4_Attr1, Agr4_Attr2], [Agr3_Attr1, Agr3_Attr2]]|[Agr3_Attr1, Agr3_Attr2]|Agr3_Attr1  |[[A3At1Line1, A3At1Line2]]|[Agr3_Attr1, Agr3_Attr2, [[A3At1Line1, A3At1Line2]]]|
|Act2_Attr1 |Act2_Attr2 |[[Agr4_Attr1, Agr4_Attr2], [Agr3_Attr1, Agr3_Attr2]]|[Agr4_Attr1, Agr4_Attr2]|Agr4_Attr1  |[[A4At1Line1, A4At1Line2]]|[Agr4_Attr1, Agr4_Attr2, [[A4At1Line1, A4At1Line2]]]|
|Act3_Attr1 |Act3_Attr2 |[[Agr5_Attr1, Agr5_Attr2]]                          |[Agr5_Attr1, Agr5_Attr2]|Agr5_Attr1  |[[A5At1Line1, A5At1Line2]]|[Agr5_Attr1, Agr5_Attr2, [[A5At1Line1, A5At1Line2]]]|
+-----------+-----------+----------------------------------------------------+------------------------+------------+--------------------------+----------------------------------------------------+


scala> outputDF.printSchema
root
|-- Activity_A1: string (nullable = true)
|-- Activity_A2: string (nullable = true)
|-- Details: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- Agreement_A1: string (nullable = true)
|    |    |-- Agreement_A2: string (nullable = true)
|-- DetailsExploded: struct (nullable = true)
|    |-- Agreement_A1: string (nullable = true)
|    |-- Agreement_A2: string (nullable = true)
|-- Agreement_A1: string (nullable = true)
|-- Lines: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- Line_A1: string (nullable = true)
|    |    |-- Line_A2: string (nullable = true)
|-- DetailsWithAgreementA1Lines: struct (nullable = false)
|    |-- Agreement_A1: string (nullable = true)
|    |-- Agreement_A2: string (nullable = true)
|    |-- Lines: array (nullable = true)
|    |    |-- element: struct (containsNull = true)
|    |    |    |-- Line_A1: string (nullable = true)
|    |    |    |-- Line_A2: string (nullable = true)



scala> outputDF.groupBy("Activity_A1", "Activity_A2").agg(collect_list($"DetailsWithAgreementA1Lines") as "Details").show(false)
+-----------+-----------+------------------------------------------------------------------------------------------------------------+
|Activity_A1|Activity_A2|Details                                                                                                     |
+-----------+-----------+------------------------------------------------------------------------------------------------------------+
|Act1_Attr1 |Act1_Attr2 |[[Agr1_Attr1, Agr1_Attr2, [[A1At1Line1, A1At1Line2]]]]                                                      |
|Act2_Attr1 |Act2_Attr2 |[[Agr3_Attr1, Agr3_Attr2, [[A3At1Line1, A3At1Line2]]], [Agr4_Attr1, Agr4_Attr2, [[A4At1Line1, A4At1Line2]]]]|
|Act3_Attr1 |Act3_Attr2 |[[Agr5_Attr1, Agr5_Attr2, [[A5At1Line1, A5At1Line2]]]]                                                      |
+-----------+-----------+------------------------------------------------------------------------------------------------------------+


scala> outputDF.groupBy("Activity_A1", "Activity_A2").agg(collect_list($"DetailsWithAgreementA1Lines") as "Details").printSchema
root
|-- Activity_A1: string (nullable = true)
|-- Activity_A2: string (nullable = true)
|-- Details: array (nullable = true)
|    |-- element: struct (containsNull = true)
|    |    |-- Agreement_A1: string (nullable = true)
|    |    |-- Agreement_A2: string (nullable = true)
|    |    |-- Lines: array (nullable = true)
|    |    |    |-- element: struct (containsNull = true)
|    |    |    |    |-- Line_A1: string (nullable = true)
|    |    |    |    |-- Line_A2: string (nullable = true)

【讨论】:

以上是关于如何在 Spark 数据框中使用嵌套列进行连接的主要内容,如果未能解决你的问题,请参考以下文章

如何在spark scala数据框中更新嵌套列的xml值

Spark - 如何使用列对数据框中的字符串进行切片[重复]

使用 spark-xml 从 pyspark 数据框中选择嵌套列

连接数据框中的所有列

在 spark 数据框中的嵌套 json 中将部分父 Schema 列添加到子项

如何在数据块中使用 Spark sql 连接 Spark 数据框列