Spark 中的 XML 处理

Posted

技术标签:

【中文标题】Spark 中的 XML 处理【英文标题】:Xml processing in Spark 【发布时间】:2015-10-12 09:54:32 【问题描述】:

场景: 我的输入将是多个小型 XML,并且我应该将这些 XML 读取为 RDD。与另一个数据集执行连接并形成 RDD 并将输出作为 XML 发送。

是否可以使用 spark 读取 XML,将数据加载为 RDD?如果可能,如何读取 XML。

示例 XML:

<root>
    <users>
        <user>
              <account>1234<\account>
              <name>name_1<\name>
              <number>34233<\number>
         <\user>
         <user>
              <account>58789<\account>
              <name>name_2<\name>
              <number>54697<\number>
         <\user>    
    <\users>
<\root>

这将如何加载到 RDD 中?

【问题讨论】:

顺便说一句,您的 XML 根本不是 XML。需要全部替换\` to /` 嗨帕瓦尼!我从 Spark 上的这个练习开始,我想知道在课堂上更高级的解决方案,你能帮我吗? 【参考方案1】:

是的,有可能,但细节会因您采取的方法而异。

如果文件很小,正如您所提到的,最简单的解决方案是使用SparkContext.wholeTextFiles 加载您的数据。它将数据加载为RDD[(String, String)],其中第一个元素是路径,第二个元素是文件内容。然后像在本地模式中一样单独解析每个文件。 对于较大的文件,您可以使用Hadoop input formats。 如果结构简单,您可以使用textinputformat.record.delimiter 拆分记录。你可以找到一个简单的例子here。输入不是 XML,但您应该告诉您如何进行操作 否则 Mahout 提供 XmlInputFormat

终于可以使用SparkContext.textFile 读取文件并在稍后调整分区之间的记录跨度。从概念上讲,它的意思类似于创建滑动窗口或partitioning records into groups of fixed size:

使用mapPartitionsWithIndex分区来识别分区间损坏的记录,收集损坏的记录 使用第二个mapPartitionsWithIndex 修复损坏的记录

编辑

还有相对较新的spark-xml 包,允许您通过标签提取特定记录:

val df = sqlContext.read
  .format("com.databricks.spark.xml")
   .option("rowTag", "foo")
   .load("bar.xml")

【讨论】:

那么,如何修复损坏的记录? 您将如何处理 XML 中的嵌套键并将它们置于同一级别?【参考方案2】:

下面是使用HadoopInputFormats 来读取spark 中的XML 数据的方法,如@zero323 所述。

输入数据:

<root>
    <users>
        <user>
            <account>1234<\account>
            <name>name_1<\name>
            <number>34233<\number>
        <\user>
        <user>
            <account>58789<\account>
            <name>name_2<\name>
            <number>54697<\number>
        <\user>
    <\users>
<\root>

读取 XML 输入的代码:

你会在link得到一些罐子

进口:

//---------------spark_import
import org.apache.spark.SparkContext
import org.apache.spark.SparkConf
import org.apache.spark.sql.SQLContext

//----------------xml_loader_import
import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.io. LongWritable, Text 
import com.cloudera.datascience.common.XmlInputFormat

代码:

object Tester_loader 
  case class User(account: String, name: String, number: String)
  def main(args: Array[String]): Unit = 

    val sparkHome = "/usr/big_data_tools/spark-1.5.0-bin-hadoop2.6/"
    val sparkMasterUrl = "spark://SYSTEMX:7077"

    var jars = new Array[String](3)

    jars(0) = "/home/hduser/Offload_Data_Warehouse_Spark.jar"
    jars(1) = "/usr/big_data_tools/JARS/Spark_jar/avro/spark-avro_2.10-2.0.1.jar"

    val conf = new SparkConf().setAppName("XML Reading")
    conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .setMaster("local")
      .set("spark.cassandra.connection.host", "127.0.0.1")
      .setSparkHome(sparkHome)
      .set("spark.executor.memory", "512m")
      .set("spark.default.deployCores", "12")
      .set("spark.cores.max", "12")
      .setJars(jars)

    val sc = new SparkContext(conf)
    val sqlContext = new SQLContext(sc)
    import sqlContext.implicits._

    // ---- loading user from XML

    // calling function 1.1
    val pages = readFile("src/input_data", "<user>", "<\\user>", sc) 

    val xmlUserDF = pages.map  tuple =>
      
        val account = extractField(tuple, "account")
        val name = extractField(tuple, "name")
        val number = extractField(tuple, "number")

        User(account, name, number)
      
    .toDF()
    println(xmlUserDF.count())
    xmlUserDF.show()
  

功能:

  def readFile(path: String, start_tag: String, end_tag: String,
      sc: SparkContext) = 

    val conf = new Configuration()
    conf.set(XmlInputFormat.START_TAG_KEY, start_tag)
    conf.set(XmlInputFormat.END_TAG_KEY, end_tag)
    val rawXmls = sc.newAPIHadoopFile(
        path, classOf[XmlInputFormat], classOf[LongWritable],
        classOf[Text], conf)

    rawXmls.map(p => p._2.toString)
  

  def extractField(tuple: String, tag: String) = 
    var value = tuple.replaceAll("\n", " ").replace("<\\", "</")

    if (value.contains("<" + tag + ">") &&
        value.contains("</" + tag + ">")) 
      value = value.split("<" + tag + ">")(1).split("</" + tag + ">")(0)
    
    value
  


输出:

+-------+------+------+
|account|  name|number|
+-------+------+------+
|   1234|name_1| 34233|
|  58789|name_2| 54697|
+-------+------+------+

获得的结果在数据帧中,您可以像这样根据您的要求将它们转换为 RDD->

val xmlUserRDD = xmlUserDF.toJavaRDD.rdd.map  x =>
    (x.get(0).toString(),x.get(1).toString(),x.get(2).toString()) 

请评估一下,如果它对你有帮助的话。

【讨论】:

【参考方案3】:

这会对你有所帮助。

package packagename;

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;

import com.databricks.spark.xml.XmlReader;

public class XmlreaderSpark 
    public static void main(String arr[])
    String localxml="file path";
    String booksFileTag = "user";

    String warehouseLocation = "file:" + System.getProperty("user.dir") + "spark-warehouse";
   System.out.println("warehouseLocation" + warehouseLocation);
    SparkSession spark = SparkSession
              .builder()
              .master("local")
              .appName("Java Spark SQL Example")
              .config("spark.some.config.option", "some-value").config("spark.sql.warehouse.dir", warehouseLocation)
              .enableHiveSupport().config("set spark.sql.crossJoin.enabled", "true")
              .getOrCreate();
    SQLContext sqlContext = new SQLContext(spark);

    Dataset<Row> df = (new XmlReader()).withRowTag(booksFileTag).xmlFile(sqlContext, localxml);
    df.show();

    

你需要在你的 POM.xml 中添加这个依赖:

<dependency>
   <groupId>com.databricks</groupId>
   <artifactId>spark-xml_2.10</artifactId>
   <version>0.4.0</version>
</dependency>

您的输入文件格式不正确。

谢谢。

【讨论】:

【参考方案4】:

对于简单的情况有两个不错的选择:

wholeTextFiles。在您的 XML 解析器中使用 map 方法,它可以是 Scala XML 拉式解析器(编码速度更快)或 SAX 拉式解析器(性能更好)。 Hadoop streaming XMLInputFormat 必须定义开始和结束标签 &lt;user&gt; &lt;/user&gt; 来处理它,但是,它会为每个用户标签创建一个分区 spark-xml package 也是一个不错的选择。

使用所有选项,您只能处理简单的 XML,这些 XML 可以解释为具有行和列的数据集。

但是,如果我们让它变得有点复杂,这些选项就没有用了。

例如,如果您还有一个实体:

<root>
    <users>
    <user>...</users>
    <companies>
    <company>...</companies>
</root>

现在您需要生成 2 个 RDD 并更改解析器以识别 &lt;company&gt; 标记。

这只是一个简单的例子,但 XML 可能要复杂得多,您需要包含越来越多的更改。

为了解决这种复杂性,我们在 Apache Spark 之上构建了 Flexter,以减轻 processing XML files on Spark 的痛苦。我还建议阅读converting XML on Spark to Parquet。后一篇文章还包含一些代码示例,展示了如何使用 SparkSQL 查询输出。

免责声明:我为 Sonra 工作

【讨论】:

我建议你添加一个免责声明,你是这家公司的联合创始人。

以上是关于Spark 中的 XML 处理的主要内容,如果未能解决你的问题,请参考以下文章

Spark实战练习01--XML数据处理

Spark-xml 在读取处理指令时崩溃

使用 databrics spark API 推断 xml 处理的模式选项

多个NameNode的HDFS集群切换HA后,Spark应用变得很慢的处理办法

如何从 SPARK SCALA 中的 XML 模式中获取列名?

在 Spark UDF 中处理 XML 字符串并返回 Struct Field