如何使用 Spark-Xml 生成复杂的 XML

Posted

技术标签:

【中文标题】如何使用 Spark-Xml 生成复杂的 XML【英文标题】:How to Generate a complex XML using Spark-Xml 【发布时间】:2018-04-09 09:40:01 【问题描述】:

我正在尝试从我的 JavaRDd 和 JavaRdd 生成一个复杂的 xml,我怎样才能将这两者结合起来生成下面的 xml?

<xml>
<library>
    <books>
        <book>
            <author>test</author>
        </book>
    </books>
    <reviews>
        <review>
            <id>1</id>
        </review>
    </reviews>
</library>

如您所见,有一个父根库,其中包含子书和评论。

以下是我如何生成 Book and Review Dataframe

DataFrame bookFrame = sqlCon.createDataFrame(bookRDD, Book.class);
DataFrame reviewFrame = sqlCon.createDataFrame(reviewRDD, Review.class);

我知道要生成 xml,我特别怀疑是否拥有 Library rootTag 并将 Books and Reviews 作为其子项。

我正在使用 Java。但如果你能指出我的正确,你可以编写 Scala 或 Python 示例。

【问题讨论】:

那行不通。根标签很简单(有一个选项),但你不能在一个 DataFrame 中拥有不同类型的对象(不同的模式)。 @user9613318 那么我应该在生成2个不同的xml之后执行字符串连接吗?这是唯一的出路吗? @user9613318 你能帮我解决这个问题吗? ***.com/questions/50007809/… @user9613318 你能帮我解决这个问题吗***.com/questions/50131641/… 【参考方案1】:

这可能不是使用 Spark 执行此操作的最有效方式,但下面的代码可以按您的意愿工作。 (虽然是 Scala,因为我的 Java 有点生疏)

import java.io.File, PrintWriter
import org.apache.spark.sql.SaveMode, SparkSession
import scala.io.Source

val spark = SparkSession.builder()
  .master("local[3]")
  .appName("test")
  .config("spark.driver.allowMultipleContexts", "true")
  .getOrCreate()

import spark.implicits._

/* Some code to test */
case class Book(author: String)
case class Review(id: Int)
org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this)


val bookFrame = List(
  Book("book1"),
  Book("book2"),
  Book("book3"),
  Book("book4"),
  Book("book5")
).toDS()


val reviewFrame = List(
  Review(1),
  Review(2),
  Review(3),
  Review(4)
).toDS()

/* End test code **/

// Using databricks api save as 1 big xml file (instead of many parts, using repartition)
// You don't have to use repartition, but each part-xxx file will wrap contents in the root tag, making it harder to concat later.
// And TBH it really doesn't matter that Spark is doing the merging here, since the combining of data is already on the master node only
bookFrame
  .repartition(1)
  .write
  .format("com.databricks.spark.xml")
  .option("rootTag", "books")
  .option("rowTag", "book")
  .mode(SaveMode.Overwrite)
  .save("/tmp/books/") // store to temp location

// Same for reviews
reviewFrame
  .repartition(1)
  .write
  .format("com.databricks.spark.xml")
  .option("rootTag", "reviews")
  .option("rowTag", "review")
  .mode(SaveMode.Overwrite)
  .save("/tmp/review") // store to temp location


def concatFiles(path:String):List[String] = 
  new File(path)
    .listFiles
    .filter(
      _.getName.startsWith("part") // get all part-xxx files only (should be only 1)
    )
    .flatMap(file => Source.fromFile(file.getAbsolutePath).getLines()) 
    .map("    " + _) // prefix with spaces to allow for new root level xml
    .toList


val lines = List("<xml>","<library>") ++ concatFiles("/tmp/books/") ++ concatFiles("/tmp/review/") ++ List("</library>")
new PrintWriter("/tmp/target.xml")
  write(lines.mkString("\n"))
  close

结果:

<xml>
<library>
    <books>
        <book>
            <author>book1</author>
        </book>
        <book>
            <author>book2</author>
        </book>
        <book>
            <author>book3</author>
        </book>
        <book>
            <author>book4</author>
        </book>
        <book>
            <author>book5</author>
        </book>
    </books>
    <reviews>
        <review>
            <id>1</id>
        </review>
        <review>
            <id>2</id>
        </review>
        <review>
            <id>3</id>
        </review>
        <review>
            <id>4</id>
        </review>
    </reviews>
</library>

另一种方法可能是(仅使用 spark)创建一个新对象 case class BookReview(books: List[Book], reviews: List[Review]) 并在 .collect() 之后将所有书籍和评论存储到 xml 到一个列表中。

虽然那时我不会使用 spark 只处理单个记录 (BookReview),而是使用普通的 xml 库(如 xstream 左右)来存储此对象。


更新 List concat 方法对内存不友好,因此使用流和缓冲区这可能是一个解决方案,而不是concatFiles 方法。

def outputConcatFiles(path: String, outputFile: File): Unit = 
  new File(path)
    .listFiles
    .filter(
      _.getName.startsWith("part") // get all part-xxx files only (should be only 1)
    )
    .foreach(file => 
      val writer = new BufferedOutputStream(new FileOutputStream(outputFile, true))
      val reader = new BufferedReader(new InputStreamReader(new FileInputStream(file)))

      try 
        Stream.continually(reader.readLine())
          .takeWhile(_ != null)
          .foreach(line =>
            writer.write(s"    $line\n".getBytes)
          )
       catch 
        case e: Exception => println(e.getMessage)
       finally 
        writer.close()
        reader.close()
      
    )


val outputFile = new File("/tmp/target2.xml")
new PrintWriter(outputFile)  write("<xml>\n<library>\n"); close
outputConcatFiles("/tmp/books/", outputFile)
outputConcatFiles("/tmp/review/", outputFile)
new PrintWriter(new FileOutputStream(outputFile, true))  append("</library>"); close

【讨论】:

感谢您的回答。我收集了大量的记录,接近五十万,因此我无法列出它。在当前配置中,它会抛出 OutOfMemory... 但我会尝试您所解释的串联。 好点。我已经更新了我的答案以允许 FileStreams 等 你能帮我解决这个问题吗? ***.com/questions/50007809/… Tom Lous ***.com/questions/50131641/… 你能帮我解决这个问题吗

以上是关于如何使用 Spark-Xml 生成复杂的 XML的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 spark-xml 包使用 XSD 解析 XML?

如何在 spark-xml 中禁用科学计数法

(spark-xml) 使用 from_xml 函数解析 xml 列时仅接收 null

spark-xml 中具有嵌套父节点的自定义模式

使用 spark-xml 从 pyspark 数据框中选择嵌套列

Spark-xml 在读取处理指令时崩溃