如何使用 Spark 从 XML 复制到 SQL Server

Posted

技术标签:

【中文标题】如何使用 Spark 从 XML 复制到 SQL Server【英文标题】:How to use Spark to copy from XML to SQL Server 【发布时间】:2019-03-27 15:56:10 【问题描述】:

我需要打开存储在 Azure Datalake Store 上的多个 XML 文件的内容并将其复制到 Azure SQL DB。这是 XML 文件结构:

<?xml version="1.0" encoding="utf-8"?>
<FileSummary xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" 
    xsi:noNamespaceSchemaLocation="invoices.xsd">
      <Header>
      <SequenceNumber>1</SequenceNumber>
      <Description>Hello</Description>
      <ShipDate>20180101</ShipDate>
     </Header>
     <FileInvoices>
      <InvoiceNumber>000000A</InvoiceNumber>
      <InvoiceHeader>
       <InvoiceHeaderDate>201800201</InvoiceHeaderDate>
       <InvoiceHeaderDescription>XYZ</InvoiceHeaderDescription>
      </InvoiceHeader>
      <InvoiceItems>
       <ItemId>000001</ItemId>
       <ItemQuantity>000010</ItemQuantity>
       <ItemPrice>000100</ItemPrice>
      </InvoiceItems>
     </FileInvoices>
     <FileInvoices>
      <InvoiceNumber>000000B</InvoiceNumber>
      <InvoiceHeader>
       <InvoiceHeaderDate>201800301</InvoiceHeaderDate>
       <InvoiceHeaderDescription>ABC</InvoiceHeaderDescription>
      </InvoiceHeader>
      <InvoiceItems>
       <ItemId>000002</ItemId>
       <ItemQuantity>000020</ItemQuantity>
       <ItemPrice>000200</ItemPrice>
      </InvoiceItems>
     </FileInvoices>
</FileSummary>

所以我使用 Azure Databricks 将 Datalake 存储挂载为“/mnt/testdata”,然后我尝试使用以下命令打开上面的示例文件

dfXml = (sqlContext.read.format("xml") # requires maven library <HyukjinKwon:spark-xml:0.1.1-s_2.11>
         .options(rootTag='FileSummary')
         .load('/mnt/testdata/data/invoices_file1.xml')) 
dfXml.cache()
print ("Number of records in this dataframe: " + str(dfXml.count())) 

dfXml.printSchema()

返回以下结果:

dfXml:pyspark.sql.dataframe.DataFrame
FileInvoices:array
element:struct
InvoiceHeader:struct
InvoiceHeaderDate:long
InvoiceHeaderDescription:string
InvoiceItems:struct
ItemId:long
ItemPrice:long
ItemQuantity:long
InvoiceNumber:string
Header:struct
Description:string
SequenceNumber:long
ShipDate:long
xmlns:xsi:string
xsi:noNamespaceSchemaLocation:string
Number of records in this dataframe: 1
root
 |-- FileInvoices: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- InvoiceHeader: struct (nullable = true)
 |    |    |    |-- InvoiceHeaderDate: long (nullable = true)
 |    |    |    |-- InvoiceHeaderDescription: string (nullable = true)
 |    |    |-- InvoiceItems: struct (nullable = true)
 |    |    |    |-- ItemId: long (nullable = true)
 |    |    |    |-- ItemPrice: long (nullable = true)
 |    |    |    |-- ItemQuantity: long (nullable = true)
 |    |    |-- InvoiceNumber: string (nullable = true)
 |-- Header: struct (nullable = true)
 |    |-- Description: string (nullable = true)
 |    |-- SequenceNumber: long (nullable = true)
 |    |-- ShipDate: long (nullable = true)
 |-- xmlns:xsi: string (nullable = true)
 |-- xsi:noNamespaceSchemaLocation: string (nullable = true)

所以看起来上面的命令确实正确读取了文件,当然我能够连接到我规范化的 Azure SQL DB 并将记录写入特定的表:

dfXml.write.jdbc(url=jdbcUrl, table="dest_table", mode="overwrite", properties=connectionProperties)

,但是这种方法需要设置一些嵌套循环和大量手动任务来跟踪每个表的键并尊重不利用 Spark 架构的引用完整性,所以我现在想知道是否有最佳实践(或预建库),它以更自动化和可扩展的方式完成此任务。

我希望这是一个常见的需求,所以理想情况下我会使用一个库来读取开头显示的完整 XML 结构,并自动提取信息以插入到规范化表中。

非常感谢您的任何建议。

毛罗

【问题讨论】:

要从 XML 中解析数据,您可以使用标准库中的 ElementTree module。 【参考方案1】:

取决于您要执行的操作以及表结构的外观。我假设您正在尝试使用 spark 处理许多文件。并且还想将数据加载到不同的规范化表中

例如您可能希望将标题写入一个表,header->fileInvoices 是一对多的关系,因此可能是另一个表。

当您使用 load(filename*.xml) 读取多个 xml 文件时,您 想要将 FileSummary 设为 rowtag。然后你会有多个 数据框中的行,每个文件摘要一个。

您可以将标题列选择到另一个数据框中并写入 到一张桌子。

FileInvoices 是结构数组,您可以将它们分解成行 并将它们存储到另一个表中。

此外,如果每张发票可以包含多个项目,您可以再做一个 分解成行并将它们存储到另一个表中

或者您可以进行两次分解并将生成的数据框加载到一个大的非规范化表中。

这是一篇关于爆炸如何工作的文章 https://hadoopist.wordpress.com/2016/05/16/how-to-handle-nested-dataarray-of-structures-or-multiple-explodes-in-sparkscala-and-pyspark/

【讨论】:

【参考方案2】:

我正在使用 spark-shell 在下面执行。我相信 xml 结构正在重复。 您需要创建/引用一个与 xml 文件相关的架构。 你可以使用brickhouse udf jar。 那么

1.创建如下函数

sql(""" create temporary function numeric_range as brickhouse.udf.collect.NumericRange""")

2.使用架构

var df=sqlContext.read.format("com.databricks.spark.xml").option("rowTag","FileSummary").load("location of schema file")

val schema=df.schema

3.var df1=sqlContext.read.format("com.databricks.spark.xml").option("rowTag","FileSummary").schema(schema).load("location of actual xml file")

df1.registerTempTable("XML_Data")

4.您需要将 FileInvoices 展平如下

val df2=sql("select array_index(FileInvoices,n) as FileInvoices from XML_Data lateral view numeric_range(size(FileInvoices))n1 as n""").registerTempTable("xmlData2")

一旦每个都转换为 Struct 就更容易遍历或使用 explode 使用 FileInvoices.InvoiceHeader.InvoiceHeaderDate

val jdbcUsername = "<username>"
val jdbcPassword = "<password>"
val jdbcHostname = "<hostname>" //typically, this is in the form or servername.database.windows.net
val jdbcPort = 1433
val jdbcDatabase ="<database>"

val jdbc_url = s"jdbc:sqlserver://$jdbcHostname:$jdbcPort;database=$jdbcDatabase;encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=60;"

val connectionProperties = new Properties()
connectionProperties.put("user", s"$jdbcUsername")
connectionProperties.put("password", s"$jdbcPassword")

spark.table("").write.jdbc(jdbc_url, "xmlData2", connectionProperties)

【讨论】:

【参考方案3】:

谢谢苏巴什,阿南德。 关于 Subash 的回答,我没有架构文件,所以我修改了他的第 2 步,将“实际 xml 文件的位置”替换为“实际 xml 文件的位置”,它确实有效:在第 3 步之后,如果我只是运行

df2=sql("select * from XML_Data")

然后我跑

from pyspark.sql.functions import explode
df3=df2.withColumn("FileInvoices", explode(df2.FileInvoices))
display(df3)

因此,它在多行中复制了相同的单个 Header 结构,在 FileInvoices 列中,我有一个不同的发票结构: exploded FileInvoices

所以看起来我离最终目标越来越近了,但是我仍然想以正确的顺序自动创建记录以避免破坏参照完整性。

但在此之前,我非常感谢您的反馈。

再次感谢,

毛罗

【讨论】:

以上是关于如何使用 Spark 从 XML 复制到 SQL Server的主要内容,如果未能解决你的问题,请参考以下文章

通过spark-sql快速读取hive中的数据

如何通过 spark-sql 复制表

实例化“org.apache.spark.sql.hive.HiveExternalCatalog”时出错

如何将数据从 Spark SQL 导出到 CSV

Spark-sql CLI 在运行查询时仅使用 1 个执行程序

databricks spark sql复制到不加载数据