如何从 SPARK SCALA 中的 XML 模式中获取列名?

Posted

技术标签:

【中文标题】如何从 SPARK SCALA 中的 XML 模式中获取列名?【英文标题】:How to fetch column names from XML schema in SPARK SCALA? 【发布时间】:2021-05-28 06:35:18 【问题描述】:

我有一个要求,我需要处理包含 XML 的表中的列。我正在尝试根据标签将 XML 列转换为多个单独的列。我正在使用 spark_xml 类来执行相同的操作。 我参考了问题 parsing XML columns from PySpark Dataframe using UDF 但是这里全部在 Pyspark 中处理,我需要在 SCALA 中处理。

我已经到了可以制作解析列的地步了。 我需要分解数据并将 XML 标记转换为列名。 我需要 SCALA 中该问题的以下行的等效项

df2 = parsed.select(*parsed.columns[:-1],F.explode(F.col('parsed').getItem('visitor')))    

new_col_names = [s.split(':')[0] for s in payloadSchema['visitor'].simpleString().split('<')[-1].strip('>>').split(',')]

添加 XML

<?xml version="1.0" encoding="utf-8"?> <visitors> <visitor id="9615" age="68" sex="F" /> <visitor id="1882" age="34" sex="M" /> <visitor id="5987" age="23" sex="M" /> </visitors>

输出:

> # +---+--------------------+----+----+----+
> # | id|            visitors|_age| _id|_sex|
> # +---+--------------------+----+----+----+
> # |  1|<?xml version="1....|  68|9615|   F|
> # |  1|<?xml version="1....|  34|1882|   M|
> # |  1|<?xml version="1....|  23|5987|   M|
> # +---+--------------------+----+----+----+

【问题讨论】:

您可以添加您的示例 xml 和预期输出吗? 查看这篇文章 - ***.com/questions/62379533/… 可能会对您有所帮助。 已添加xml。 【参考方案1】:

使用org.json将xml转换为json。

示例 XML 数据

val xmlData = """<?xml version="1.0" encoding="utf-8"?> <visitors> <visitor id="9615" age="68" sex="F" /> <visitor id="1882" age="34" sex="M" /> <visitor id="5987" age="23" sex="M" /> </visitors>"""

UDF 函数

val parse = udf((value: String) => 
    import org.json._
    XML.toJSONObject(value).toString
  
)

转换后的 json 数据的架构。

import org.apache.spark.sql.types._

val schema = DataType.fromJson(""""type":"struct","fields":["name":"visitors","type":"type":"struct","fields":["name":"visitor","type":"type":"array","elementType":"type":"struct","fields":["name":"age","type":"long","nullable":true,"metadata":,"name":"id","type":"long","nullable":true,"metadata":,"name":"sex","type":"string","nullable":true,"metadata":],"containsNull":true,"nullable":true,"metadata":],"nullable":true,"metadata":]""").asInstanceOf[StructType]
scala> schema.printTreeString
root
 |-- visitors: struct (nullable = true)
 |    |-- visitor: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- age: long (nullable = true)
 |    |    |    |-- id: long (nullable = true)
 |    |    |    |-- sex: string (nullable = true)
 df
 .withColumn(
     "parsed_xml", 
     from_json(parse($"xml"),schema)
    )
 .select(
        $"id",
        $"xml",
        explode_outer($"parsed_xml.visitors.visitor").as("visitors")
    )
 .select(
     $"id",
     $"xml",
     $"visitors.*"
    )
 .show(false)

最终输出

+---+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---+----+---+
|id |xml                                                                                                                                                                               |age|id  |sex|
+---+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---+----+---+
|1  |<?xml version="1.0" encoding="utf-8"?> <visitors> <visitor id="9615" age="68" sex="F" /> <visitor id="1882" age="34" sex="M" /> <visitor id="5987" age="23" sex="M" /> </visitors>|68 |9615|F  |
|1  |<?xml version="1.0" encoding="utf-8"?> <visitors> <visitor id="9615" age="68" sex="F" /> <visitor id="1882" age="34" sex="M" /> <visitor id="5987" age="23" sex="M" /> </visitors>|34 |1882|M  |
|1  |<?xml version="1.0" encoding="utf-8"?> <visitors> <visitor id="9615" age="68" sex="F" /> <visitor id="1882" age="34" sex="M" /> <visitor id="5987" age="23" sex="M" /> </visitors>|23 |5987|M  |
+---+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+---+----+---+

【讨论】:

谢谢你,但 xml 的架构不是固定的。实时可能缺少很少的标签,这就是为什么我需要在运行时生成架构,就像它在示例中所做的那样提供。 获取 xsd 格式的 XML 架构,然后很容易从中生成架构。

以上是关于如何从 SPARK SCALA 中的 XML 模式中获取列名?的主要内容,如果未能解决你的问题,请参考以下文章

使用 scala 从 HDFS 读取输入 xml 数据

Spark 结构化流:Scala 中的模式推理

如何在 Spark Scala 中的 Schema RDD [从案例类中创建] 中查找重复项以及相应的重复计数?

Scala - 如何在 Spark 的 map 函数中实现 Try

如何使用 Scala 从 Spark 中的列表或数组创建行

在本地文件系统(不是HDFS)中使用Scala读取Apache Spark中的文件时如何更改输入块大小[重复]