在 PySpark 中使用 XSD

Posted

技术标签:

【中文标题】在 PySpark 中使用 XSD【英文标题】:Using XSD in PySpark 【发布时间】:2021-11-05 07:15:37 【问题描述】:

我正在 Azure Synapse 中构建一个数据仓库,其中一个来源是大约 20 种不同类型的 XML 文件(具有不同的 XSD 方案)和 1 个基本方案。

我正在寻找的是获取所有 XML 元素并将它们存储在我的数据湖中的文件中(每种类型 1 个)。为此,我需要为每个元素指定唯一名称,例如将整个路径作为名称。我尝试使用所有元素名称为每种类型定义字典,但这是一项相当多的工作。为了自动化这个(XSD 每年更新),我尝试在 Excel 和 VBA 中编写代码,但是 XSD 非常复杂,具有嵌套的复杂类型等。 下面是 baseschema.xsd 的 sn-p:

<?xml version="1.0" encoding="UTF-8"?>
<xs:schema targetNamespace="http://www.website.org/typ/1/baseschema/schema" elementFormDefault="qualified" xmlns:xs="http://www.w3.org/2001/XMLSchema" xmlns:iwmo="http://www.website.org/typ/1/baseschema/schema">
    <xs:complexType name="Complex_Address">
        ...
        <xs:sequence>
            <xs:element name="Home" type="iwmo:Complex_House" minOccurs="0">
                ...
            </xs:element>
            <xs:element name="Postalcode" type="iwmo:Simple_Postalcode" minOccurs="0">
                ...
            </xs:element>
            <xs:element name="Streetname" type="iwmo:Simple_Streetname" minOccurs="0">
                ...
            </xs:element>
            <xs:element name="Areaname" type="iwmo:Simple_Areaname" minOccurs="0">
                ...
            </xs:element>
            <xs:element name="CountryCode" type="iwmo:Simple_CountryCode" minOccurs="0">
                ...
            </xs:element>
        </xs:sequence>
    </xs:complexType>
    <xs:complexType name="Complex_House">
        ...
        <xs:sequence>
            <xs:element name="Housenumber" type="iwmo:Simple_Housenumber">
                ...
            </xs:element>
            <xs:element name="Houseletter" type="iwmo:Simple_Houseletter" minOccurs="0">
                ...
            </xs:element>
            <xs:element name="HousenumberAddition" type="iwmo:Simple_HousenumberAddition" minOccurs="0">
                ...
            </xs:element>
            <xs:element name="IndicationAddress" type="iwmo:Simple_IndicationAddress" minOccurs="0">
                ...
            </xs:element>
        </xs:sequence>
    </xs:complexType>

    <xs:complexType name="Complex_MessageIdentification">
            ...
        <xs:sequence>
            <xs:element name="Identification" type="iwmo:Simple_IdentificationMessage">
                ...
            </xs:element>
            <xs:element name="Date" type="iwmo:Simple_Date">
                ...
            </xs:element>
        </xs:sequence>
    </xs:complexType>
    <xs:complexType name="Complex_Product">
            ...
        <xs:sequence>
            <xs:element name="Categorie" type="iwmo:Simple_ProductCategory">
                ...
            </xs:element>
            <xs:element name="Code" type="iwmo:Simple_ProductCode" minOccurs="0">
                ...
            </xs:element>
        </xs:sequence>
    </xs:complexType>
    <xs:complexType name="Complex_XsdVersion">
        <xs:sequence>
            <xs:element name="BaseschemaXsdVersion" type="iwmo:Simple_Version">
            </xs:element>
            <xs:element name="MessageXsdVersion" type="iwmo:Simple_Version">
            </xs:element>
        </xs:sequence>
    </xs:complexType>

这里是 1 个消息类型的 xsd 的 sn-p:

<?xml version="1.0" encoding="UTF-8"?>
<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" xmlns:typ="http://www.website.org/typ/1/baseschema/schema" xmlns:type1="http://www.website.org/typ/1/type1/schema" targetNamespace="http://www.website.org/typ/1/type1/schema" elementFormDefault="qualified">
    <xs:import namespace="http://www.website.org/typ/1/baseschema/schema" schemaLocation="baseschema.xsd"></xs:import>
    <xs:element name="Message" type="type1:Root"></xs:element>
    <xs:complexType name="Root">
        ...
        <xs:sequence>
            <xs:element name="Header" type="type1:Header"></xs:element>
            <xs:element name="Client" type="type1:Client"></xs:element>
        </xs:sequence>
    </xs:complexType>
    <xs:complexType name="Header">
        <xs:sequence>
            <xs:element name="Person" type="typ:Simple_SpecialCode">
                ...
            </xs:element>
            <xs:element name="MessageIdentification" type="typ:Complex_MessageIdentification">
                ...
            </xs:element>
            <xs:element name="XsdVersion" type="typ:Complex_XsdVersion">
                ...
            </xs:element>
        </xs:sequence>
    </xs:complexType>
    <xs:complexType name="Client">
                ...
        <xs:sequence>
            <xs:element name="AssignedProducts" type="type1:AssignedProducts"></xs:element>
        </xs:sequence>
    </xs:complexType>
    <xs:complexType name="AssignedProducts">
        <xs:sequence>
            <xs:element name="AssignedProduct" type="type1:AssignedProduct"  maxOccurs="unbounded"></xs:element>
        </xs:sequence>
    </xs:complexType>
    <xs:complexType name="AssignedProduct">
        ...
        <xs:sequence>
            <xs:element name="ToewijzingNummer" type="typ:Simple_Nummer">
                ...
            </xs:element>
            <xs:element name="Product" type="typ:Complex_Product" minOccurs="0">
                ...
            </xs:element>
        </xs:sequence>
    </xs:complexType>
</xs:schema>

那么这将是所需的输出:

Header_Person
Header_MessageIdentification_Identification
Header_MessageIdentification_Date
Header_XsdVersion_BaseschemaXsdVersion
Header_XsdVersion_MessageXsdVersion
Client_AssignedProduct_ToewijzingNummer
Client_AssignedProduct_Product_Category
Client_AssignedProduct_Product_Code

在基本模式中,我还添加了一个嵌套的复杂类型,以显示复杂性。

是否有某种包或 Python 中的东西可以帮助我实现这一目标?此外,可以在文本文件中创建此元素列表的工具也很棒,然后我可以轻松地将其复制到变量中。

我不确定我是否清楚我的要求,是否将其发布在具有正确标签的正确组中,但我希望有人能指出一个好的解决方案。

罗纳德

【问题讨论】:

我找到了一种解决方法,我将 xsd 中的所有字段都放在变量中。 【参考方案1】:

毕竟我找到了一种解决方法,我将 xsds 中的所有字段都放在变量中。这并不理想,但任何其他方式都太复杂了。

【讨论】:

以上是关于在 PySpark 中使用 XSD的主要内容,如果未能解决你的问题,请参考以下文章

Apache Spark:如何在Python 3中使用pyspark

PySpark:如何在列中使用 Or 进行分组

如何在 Pyspark 中使用 Scala 函数?

如何在 Databricks 的 PySpark 中使用在 Scala 中创建的 DataFrame

在 PySpark 中使用列条件替换空值

Pyspark 使用自定义函数