在 spark 中分解多个数组列以更改输入模式

Posted

技术标签:

【中文标题】在 spark 中分解多个数组列以更改输入模式【英文标题】:Exploding multiple array columns in spark for a changing input schema 【发布时间】:2021-05-14 18:08:08 【问题描述】:

以下是我的示例架构。

|-- provider: string (nullable = true)
 |-- product: string (nullable = true)
 |-- asset_name: string (nullable = true)
 |-- description: string (nullable = true)
 |-- creation_date: string (nullable = true)
 |-- provider_id: string (nullable = true)
 |-- asset: string (nullable = true)
 |-- asset_clas: string (nullable = true)
 |-- Actors: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- Actors_Display: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- Audio_Type: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- Billing_ID: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- Bit_Rate: array (nullable = true)
 |    |-- element: string (containsNull = false)
 |-- CA_Rating: array (nullable = true)
 |    |-- element: string (containsNull = false)

我需要分解所有数组类型的列。我有大约 80 多列,并且列不断变化。 我目前正在使用explode(array_zip)

   val df= sourcedf.select($"provider",$"asset_name",$"description",$"creation_date",$"provider_id",$"asset_id",$"asset_class",$"product",$"provider_id",$"eligible_platform",$"actors",$"category",
explode_outer(arrays_zip($"Actors_Display",$"Audio_Type",$"Billing_ID",$"Bit_Rate",$"CA_Rating")


val parsed_output = df.select(col("provider"),("asset_name"),col("description"),col("creation_date"),col("product"),col("provider"),
    col("povider_id"),col("asset_id"),col("asset_class"),
  col("col.Actors_Display"),col("col.Audio_Type"),col("col.Billing_ID"),col("col.Bit_Rate"),col("col.CA_Rating"))

通过使用,我可以得到上面的输出。但这仅适用于一个特定文件。就我而言,会经常添加新列。那么,是否有任何功能可以分解多个列以更改架构并从文件中选择非数组列。 有人可以举个例子吗

注意:只有数组列是不断变化的,其余的都是不变的。

以下是样本数据

<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<ADIL>
  <Meta>
    <AMS Asset_Name="asd" Provider="Level" Product="MOTD" Version_Major="1" Version_Minor="0" Description="ZXC" Creation_Date="2009-12-03" Provider_ID="qwer.com" Asset_ID="A12we" Asset_Class="package"/>
    <App_Data App="MOD" Name="Actors" Value="CableLa1.1"/>
    <App_Data App="MOD" Name="Actors_Display" Value="RTY"/>
    <App_Data App="MOD" Name="Audio_Type" Value="FGH"/>
  </Meta>
  <Asset>
    <Meta>
      <AMS Asset_Name="bnm" Provider="Level Film" Product="MOTD" Version_Major="1" Version_Minor="0" Description="bnj7" Creation_Date="2009-12-03" Provider_ID="levelfilm.com" Asset_ID="DDDB0610072533182333" Asset_Class="title"/>
      App_Data App="rt" Name="Billing_ID" Value="2020-12-29T00:00:00"/>
      <App_Data App="MOD" Name="Bit_Rate" Value="2021-12-29T23:59:59"/>
      <App_Data App="MOD" Name="CA_Rating" Value="16.99"/>
      </Meta>
    <Asset>
      <Meta>
       <AMS Asset_Name="atysd" Provider="Level1" Product="MOTD2" Version_Major="1" Version_Minor="0" Description="ZXCY" Creation_Date="2009-12-03" Provider_ID="qweDFtrr.com" Asset_ID="A12FGwe" Asset_Class="review"/>
     

这是xml数据。最初,解析此数据并将所有名称属性值转换为列名,并将所有“值”属性值转换为列名的值。这个XML有重复的标签,所以解析后的最终结果是数组列,我在解析逻辑的最后使用了collect_list。

这是解析后的示例输出。

+-------------------+-------------------+-----------------+------------+--------------+
|Actors               |Actors_Display    |Audio_Type       |Billing_ID  |Bit_rate 
+-------------+---------------+-----------------------------------------+------------
|["movie","cinema",] | ["Dolby 5.1"]     | ["High", "low"] | ["GAR15"]|  ["15","14"]         
+-------------+-----+-------------------+-----------------+--------------+----------

【问题讨论】:

你能分享一些数据吗? @Kafels 我已经用示例数据编辑了这个问题。请协助。谢谢 【参考方案1】:

假设您要分解所有 ArrayType 列(否则,相应地进行过滤):

val df = Seq(
  (1, "xx", Seq(10, 20), Seq("a", "b"), Seq("p", "q")),
  (2, "yy", Seq(30, 40), Seq("c", "d"), Seq("r", "s"))
).toDF("c1", "c2", "a1", "a2", "a3")

import org.apache.spark.sql.types.StructField, ArrayType

val arrCols = df.schema.fields
  .collectcase StructField(name, _: ArrayType, _, _) => name
  .map(col)

val otherCols = df.columns.map(col) diff arrCols

df.withColumn("arr_zip", explode_outer(arrays_zip(arrCols: _*)))
  .select(otherCols.toList ::: $"arr_zip.*" :: Nil: _*)
  .show
+---+---+---+---+---+
| c1| c2| a1| a2| a3|
+---+---+---+---+---+
|  1| xx| 10|  a|  p|
|  1| xx| 20|  b|  q|
|  2| yy| 30|  c|  r|
|  2| yy| 40|  d|  s|
+---+---+---+---+---+

【讨论】:

感谢您的回复,它按预期工作。

以上是关于在 spark 中分解多个数组列以更改输入模式的主要内容,如果未能解决你的问题,请参考以下文章

在 PySpark 中分解 JSON 中不存在的密钥

我们可以使用 IN 语句在多个查询中分解 SQL 连接吗

为啥要在 NP 中分解,而不是在 P 中分解?

在 Spark 中独立分解多个列

在单个 Spark 提交作业中分别处理多个文件

Spark Scala - 如何为每个组创建新列,然后在 spark 数据框中分解列值