(spark-xml) 使用 from_xml 函数解析 xml 列时仅接收 null
Posted
技术标签:
【中文标题】(spark-xml) 使用 from_xml 函数解析 xml 列时仅接收 null【英文标题】:(spark-xml) Receiving only null when parsing xml column using from_xml function 【发布时间】:2021-05-14 08:32:37 【问题描述】:我正在尝试使用 spark-xml 解析一个非常简单的 XML 字符串列,但我只能接收 null
值,即使正确填充了 XML。
我用来解析 xml 的 XSD 是:
<xs:schema attributeFormDefault="unqualified" elementFormDefault="qualified" xmlns:xs="http://www.w3.org/2001/XMLSchema">
<xs:element name="note">
<xs:complexType>
<xs:sequence>
<xs:element type="xs:string" name="from"/>
<xs:element type="xs:string" name="to"/>
<xs:element type="xs:string" name="message"/>
</xs:sequence>
</xs:complexType>
</xs:element>
</xs:schema>
虽然列中以字符串形式显示的 XML 如下所示,但每个标签都已正确填充:
<?xml version="1.0" encoding="UTF-8"?>
<note>
<from>Jani</from>
<to>Tove</to>
<message>Remember me this weekend</message>
</note>
我用 scala 编写的 spark 代码是这样的:
// XML Schema
val schema = XSDToSchema.read("<the XSD as string>")
// Spark Structured Streaming (N.b. the column value contains the xml as string)
import spark.implicits._
var df = initSource(spark)
.withColumn("parsed", from_xml($"value", schema,
Map(
"mode" -> "FAILFAST",
"nullValue"-> "",
"rowTag" -> "note",
"ignoreSurroundingSpaces" -> "true"
)
))
.select($"value",$"parsed.note.from", $"parsed.note.to", $"parsed.note.message")
.writeStream
.format("console")
// .option("mode", "FAILFAST")
// .option("nullValue", "")
// .option("rowTag", "note")
// .option("ignoreSurroundingSpaces","true")
.outputMode("append")
.start()
.awaitTermination(30*1000)
打印此数据帧的架构(在 select 语句之前)将给出预期的架构
root
|-- value: string (nullable = true)
|-- parsed: struct (nullable = true)
| |-- note: struct (nullable = false)
| | |-- from: string (nullable = false)
| | |-- to: string (nullable = false)
| | |-- message: string (nullable = false)
但是在控制台中打印结果时,我得到的只是null
值,如下所示:
....
-------------------------------------------
Batch: 0
-------------------------------------------
+--------------------+----+----+-------+
| value|from| to|message|
+--------------------+----+----+-------+
|<?xml version="1....|null|null| null|
|<?xml version="1....|null|null| null|
|<?xml version="1....|null|null| null|
....
我认为这无关紧要,但此 xml 列的来源来自阅读定义如下的 Kafka 主题:
def initSource(spark: SparkSession) : DataFrame =
spark.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "localhost:9092")
.option("startingoffsets", "earliest")
.option("subscribe", "my-test-topic")
.load()
.selectExpr("CAST(value AS STRING)")
有其他人遇到过这个问题并解决了吗?我的选择已经不多了,我真的很感激这个提示:)
我使用的 spark-xml 版本是最新的 atm,0.12.0
和 spark 3.1.1
。
更新
我在调用 writeStream
后错误地传递了 spark-xml 选项,而是需要将它们作为 from_xml
函数的第三个参数传递。我仍然只得到空值...
【问题讨论】:
这不是答案:首先将awaitTermination(30*1000)
替换为awaitTermination()
否则您的查询将在30 秒内停止。将 .outputMode("append")
替换为 .outputMode("update")
以仅查看每批新更新的行。接下来尝试 .foreachbatch 编写器并检查 1 个数据帧的输出。如果它还包含 null,因为如果是这种情况,那么实际 XML 数据到提供的模式的映射错误
感谢您的评论@ChitralVerma,我故意放置这些选项以在每次执行中处理相同的数据并在其后终止流,因为我只是在测试 atm,不幸的是我没有看到XML 有任何问题,并且没有指示 XML 格式错误的日志。
格式错误的错误不会出现在日志中。
其实我认为它们确实出现了,如果你通过在 log4j.properties 中添加log4j.logger.com.databricks.spark=DEBUG
来提高日志级别,你可以获得更多细节这是我得到的一个例子:2021-05-17 12:32:11 WARN com.databricks.spark.xml.parsers.StaxXmlParser$:112 - Dropping malformed line: <?xml versi...
【参考方案1】:
最后让我大开眼界的是阅读 spark-xml documentation 中提到的部分:
用于验证 XML 的 XSD 文件的路径每行单独
这意味着模式匹配是通过每一行而不是整个 XML 完成的,在这种情况下,我的示例的模式需要类似于以下内容:
val schema = StructType(Array(
StructField("from", StringType, nullable = true),
StructField("to", StringType, nullable = true),
StructField("message", StringType, nullable = true)))
也可以使用 XSD 来完成:
<xs:schema attributeFormDefault="unqualified" elementFormDefault="qualified" xmlns:xs="http://www.w3.org/2001/XMLSchema">
<xs:element type="xs:string" name="from"/>
<xs:element type="xs:string" name="to"/>
<xs:element type="xs:string" name="message"/>
</xs:schema>
这两种声明模式的方法对我有用。希望对以后的人有所帮助。
【讨论】:
确实很有帮助!我想知道如何从定义文档模式的给定 XSD 中提取行模式,所以我提出了这个问题:***.com/questions/67596525/…以上是关于(spark-xml) 使用 from_xml 函数解析 xml 列时仅接收 null的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 spark-xml 包使用 XSD 解析 XML?