如何从 Spark-Structured-Streaming 中的嵌套 XML_String 中提取信息
Posted
技术标签:
【中文标题】如何从 Spark-Structured-Streaming 中的嵌套 XML_String 中提取信息【英文标题】:How to extract information from a nested XML_String in Spark-Structured-Streaming 【发布时间】:2020-12-15 14:02:07 【问题描述】:我有一个连接到 ActiveMQ 的火花结构应用程序。应用程序接收来自主题的消息。这些消息采用 StringXML 的形式。我想从这个嵌套 XML 中提取信息。我该怎么做?
我提到了this post,但无法在 Scala 中实现类似的东西。
XML 格式:
<CofiResults>
<ExecutionTime>20201103153839</ExecutionTime>
<FilterClass>S </FilterClass>
<InputData format="something" id="someID"><ns2:FrdReq xmlns:ns2="http://someone.com">
<HeaderSegment xmlns="https://somelink.com">
<Version>6</Version>
<SequenceNb>1</SequenceNb>
</HeaderSegment>
.
.
.
我的代码:
val df = spark.readStream
.format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
.option("brokerUrl", brokerUrl_)
.option("topic", topicName_)
.option("persistence", "memory")
.option("cleanSession", "true")
.option("username", username_)
.option("password", password_)
.load()
val payload_ = df.select('payload cast "string") // This payload IS the XMLString
现在我需要从上述XML中提取ExecutionTime
、Version
等字段。
【问题讨论】:
使用import xml.etree.ElementTree
我对 Scala 还很陌生......并且不知道如何做到这一点
在spark sql中使用xpath怎么样?spark.apache.org/docs/latest/api/sql/#xpath
您能解释一下如何在 UDF 中使用它吗?谢谢
对不起,我的意思是spark.sql("select xpath(payload, <xpathquery>) from dataframe")
。 payload 是 xml 字符串列
非常感谢...在将数据帧转换为 tempTable 然后运行 sql 命令后,它工作正常
【参考方案1】:
您可以使用SQL built-in 函数xpath
等从嵌套的XML 结构中提取数据。
给定一个类似的嵌套 XML(为简单起见,我省略了任何标记参数)
<CofiResults>
<ExecutionTime>20201103153839</ExecutionTime>
<FilterClass>S</FilterClass>
<InputData>
<ns2>
<HeaderSegment>
<Version>6</Version>
<SequenceNb>1</SequenceNb>
</HeaderSegment>
</ns2>
</InputData>
</CofiResults>
然后您可以在您的selectExpr
语句中使用这些 SQL 函数(不带 createOrReplaceTempView
),如下所示:
.selectExpr("CAST(payload AS STRING) as payload")
.selectExpr(
"xpath(payload, '/CofiResults/ExecutionTime/text()') as ExecutionTimeAsArryString",
"xpath_long(payload, '/CofiResults/ExecutionTime/text()') as ExecutionTimeAsLong",
"xpath_string(payload, '/CofiResults/ExecutionTime/text()') as ExecutionTimeAsString",
"xpath_int(payload, '/CofiResults/InputData/ns2/HeaderSegment/Version/text()') as VersionAsInt")
请记住,xpath
函数将返回一个 Array 字符串,而您可能会发现将值提取为 String 甚至 Long 更方便。在 Spark 3.0.1 中使用控制台接收器流应用上述代码将导致:
+-------------------------+-------------------+---------------------+------------+
|ExecutionTimeAsArryString|ExecutionTimeAsLong|ExecutionTimeAsString|VersionAsInt|
+-------------------------+-------------------+---------------------+------------+
|[20201103153839] |20201103153839 |20201103153839 |6 |
+-------------------------+-------------------+---------------------+------------+
【讨论】:
以上是关于如何从 Spark-Structured-Streaming 中的嵌套 XML_String 中提取信息的主要内容,如果未能解决你的问题,请参考以下文章
如何将数据从回收器适配器发送到片段 |如何从 recyclerview 适配器调用片段函数
如何从服务器获取和设置 android 中的 API(从服务器获取 int 值)?如何绑定和实现这个