使用 UDF 从 PySpark Dataframe 解析嵌套的 XML 字段

Posted

技术标签:

【中文标题】使用 UDF 从 PySpark Dataframe 解析嵌套的 XML 字段【英文标题】:Parsing the nested XML fields from PySpark Dataframe using UDF 【发布时间】:2021-04-15 00:09:15 【问题描述】:

我有一个场景,我在数据框列中有 XML 数据。

sex updated_at visitors
F 1574264158 <?xml version="1.0" encoding="utf-8

我想解析 - 访问者列 - 使用 UDF 将嵌套的 XML 字段转换为 Dataframe 中的列

XML 格式

<?xml version="1.0" encoding="utf-8"?> <visitors> <visitor id="9615" age="68" sex="F" /> <visitor id="1882" age="34" sex="M" /> <visitor id="5987" age="23" sex="M" /> </visitors>

【问题讨论】:

【参考方案1】:

您可以在不使用 UDF 的情况下使用 xpath 查询:

df = spark.createDataFrame([['<?xml version="1.0" encoding="utf-8"?> <visitors> <visitor id="9615" age="68" sex="F" /> <visitor id="1882" age="34" sex="M" /> <visitor id="5987" age="23" sex="M" /> </visitors>']], ['visitors'])

df.show(truncate=False)
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|visitors                                                                                                                                                                          |
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|<?xml version="1.0" encoding="utf-8"?> <visitors> <visitor id="9615" age="68" sex="F" /> <visitor id="1882" age="34" sex="M" /> <visitor id="5987" age="23" sex="M" /> </visitors>|
+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+


df2 = df.selectExpr(
    "xpath(visitors, './visitors/visitor/@id') id",
    "xpath(visitors, './visitors/visitor/@age') age",
    "xpath(visitors, './visitors/visitor/@sex') sex"
).selectExpr(
    "explode(arrays_zip(id, age, sex)) visitors"
).select('visitors.*')

df2.show(truncate=False)
+----+---+---+
|id  |age|sex|
+----+---+---+
|9615|68 |F  |
|1882|34 |M  |
|5987|23 |M  |
+----+---+---+

如果你坚持使用 UDF:

import xml.etree.ElementTree as ET
import pyspark.sql.functions as F

@F.udf('array<struct<id:string, age:string, sex:string>>')
def parse_xml(s):
    root = ET.fromstring(s)
    return list(map(lambda x: x.attrib, root.findall('visitor')))
    
df2 = df.select(
    F.explode(parse_xml('visitors')).alias('visitors')
).select('visitors.*')

df2.show()
+----+---+---+
|  id|age|sex|
+----+---+---+
|9615| 68|  F|
|1882| 34|  M|
|5987| 23|  M|
+----+---+---+

【讨论】:

@Mr.Tssou 可以,但是由于性能原因,使用Spark SQL函数可以完成任务时不建议使用UDF 如果可能的话,你能做到吗?因为我想在不使用 xpath 的情况下查看其他方法 @Mr.Tssou 查看我为 UDF 解决方案编辑的答案。

以上是关于使用 UDF 从 PySpark Dataframe 解析嵌套的 XML 字段的主要内容,如果未能解决你的问题,请参考以下文章

Pyspark:从 Python 到 Pyspark 实现 lambda 函数和 udf

PySpark - 从 UDF 获取行索引

Pyspark:从 Struct 中识别 arrayType 列并调用 udf 将数组转换为字符串

使用 Scala 类作为带有 pyspark 的 UDF

PySpark 用户定义函数 (UDF) 创建新列

我需要创建一个 pyspark UDF,它通过比较从查询中输出一个表