如何从 Spark-Structured-Streaming 中的嵌套 XML_String 中提取信息

Posted

技术标签:

【中文标题】如何从 Spark-Structured-Streaming 中的嵌套 XML_String 中提取信息【英文标题】:How to extract information from a nested XML_String in Spark-Structured-Streaming 【发布时间】:2020-12-15 14:02:07 【问题描述】:

我有一个连接到 ActiveMQ 的火花结构应用程序。应用程序接收来自主题的消息。这些消息采用 StringXML 的形式。我想从这个嵌套 XML 中提取信息。我该怎么做?

我提到了this post,但无法在 Scala 中实现类似的东西。

XML 格式:

<CofiResults>
  <ExecutionTime>20201103153839</ExecutionTime>
  <FilterClass>S </FilterClass>
  <InputData format="something" id="someID"><ns2:FrdReq xmlns:ns2="http://someone.com">
    <HeaderSegment xmlns="https://somelink.com">
        <Version>6</Version>
        <SequenceNb>1</SequenceNb>
    </HeaderSegment>
.
.
.

我的代码:

val df = spark.readStream
            .format("org.apache.bahir.sql.streaming.mqtt.MQTTStreamSourceProvider")
            .option("brokerUrl", brokerUrl_)
            .option("topic", topicName_)
            .option("persistence", "memory")
            .option("cleanSession", "true")
            .option("username", username_)
            .option("password", password_)
            .load()

val payload_ = df.select('payload cast "string") // This payload IS the XMLString

现在我需要从上述XML中提取ExecutionTimeVersion等字段。

【问题讨论】:

使用 import xml.etree.ElementTree 我对 Scala 还很陌生......并且不知道如何做到这一点 在spark sql中使用xpath怎么样?spark.apache.org/docs/latest/api/sql/#xpath 您能解释一下如何在 UDF 中使用它吗?谢谢 对不起,我的意思是spark.sql("select xpath(payload, &lt;xpathquery&gt;) from dataframe")。 payload 是 xml 字符串列 非常感谢...在将数据帧转换为 tempTable 然后运行 ​​sql 命令后,它工作正常 【参考方案1】:

您可以使用SQL built-in 函数xpath 等从嵌套的XML 结构中提取数据。

给定一个类似的嵌套 XML(为简单起见,我省略了任何标记参数)

<CofiResults>
  <ExecutionTime>20201103153839</ExecutionTime>
  <FilterClass>S</FilterClass>
  <InputData>
    <ns2>
      <HeaderSegment>
        <Version>6</Version>
        <SequenceNb>1</SequenceNb>
      </HeaderSegment>
    </ns2>
  </InputData>
</CofiResults>

然后您可以在您的selectExpr 语句中使用这些 SQL 函数(不带 createOrReplaceTempView),如下所示:

  .selectExpr("CAST(payload AS STRING) as payload")
  .selectExpr(
    "xpath(payload, '/CofiResults/ExecutionTime/text()') as ExecutionTimeAsArryString",
    "xpath_long(payload, '/CofiResults/ExecutionTime/text()') as ExecutionTimeAsLong",
    "xpath_string(payload, '/CofiResults/ExecutionTime/text()') as ExecutionTimeAsString",
    "xpath_int(payload, '/CofiResults/InputData/ns2/HeaderSegment/Version/text()') as VersionAsInt")

请记住,xpath 函数将返回一个 Array 字符串,而您可能会发现将值提取为 String 甚至 Long 更方便。在 Spark 3.0.1 中使用控制台接收器流应用上述代码将导致:

+-------------------------+-------------------+---------------------+------------+
|ExecutionTimeAsArryString|ExecutionTimeAsLong|ExecutionTimeAsString|VersionAsInt|
+-------------------------+-------------------+---------------------+------------+
|[20201103153839]         |20201103153839     |20201103153839       |6           |
+-------------------------+-------------------+---------------------+------------+

【讨论】:

以上是关于如何从 Spark-Structured-Streaming 中的嵌套 XML_String 中提取信息的主要内容,如果未能解决你的问题,请参考以下文章

如何将数据从回收器适配器发送到片段 |如何从 recyclerview 适配器调用片段函数

如何从 Firebase 获取所有设备令牌?

如何直接从类调用从接口继承的方法?

如何从服务器获取和设置 android 中的 API(从服务器获取 int 值)?如何绑定和实现这个

如何从Mac从android studio中的fabric注销? [复制]

如何从设备中获取 PDF 文件以便能够从我的应用程序中上传?