如何使用Spark-Xml生成复杂的XML

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何使用Spark-Xml生成复杂的XML相关的知识,希望对你有一定的参考价值。

我正在尝试从我的JavaRDd <Book>和JavaRdd <Reviews>生成一个复杂的xml如何将这两个生成为xml以下生成?

<xml>
<library>
    <books>
        <book>
            <author>test</author>
        </book>
    </books>
    <reviews>
        <review>
            <id>1</id>
        </review>
    </reviews>
</library>

如您所见,有一个父根库,其中包含子书籍和评论。

以下是我如何生成Book and Review Dataframe

DataFrame bookFrame = sqlCon.createDataFrame(bookRDD, Book.class);
DataFrame reviewFrame = sqlCon.createDataFrame(reviewRDD, Review.class);

我知道要生成xml,我的疑问尤其在于拥有Library rootTag并将Books和Reviews作为其子。

我正在使用Java。但如果你能指出我的话,你可以写Scala或Python的例子。

答案

它可能不是使用Spark执行此操作的最有效方式,但下面的代码可以按您的需要工作。 (Scala虽然,因为我的Java有点生疏)

import java.io.{File, PrintWriter}
import org.apache.spark.sql.{SaveMode, SparkSession}
import scala.io.Source

val spark = SparkSession.builder()
  .master("local[3]")
  .appName("test")
  .config("spark.driver.allowMultipleContexts", "true")
  .getOrCreate()

import spark.implicits._

/* Some code to test */
case class Book(author: String)
case class Review(id: Int)
org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this)


val bookFrame = List(
  Book("book1"),
  Book("book2"),
  Book("book3"),
  Book("book4"),
  Book("book5")
).toDS()


val reviewFrame = List(
  Review(1),
  Review(2),
  Review(3),
  Review(4)
).toDS()

/* End test code **/

// Using databricks api save as 1 big xml file (instead of many parts, using repartition)
// You don't have to use repartition, but each part-xxx file will wrap contents in the root tag, making it harder to concat later.
// And TBH it really doesn't matter that Spark is doing the merging here, since the combining of data is already on the master node only
bookFrame
  .repartition(1)
  .write
  .format("com.databricks.spark.xml")
  .option("rootTag", "books")
  .option("rowTag", "book")
  .mode(SaveMode.Overwrite)
  .save("/tmp/books/") // store to temp location

// Same for reviews
reviewFrame
  .repartition(1)
  .write
  .format("com.databricks.spark.xml")
  .option("rootTag", "reviews")
  .option("rowTag", "review")
  .mode(SaveMode.Overwrite)
  .save("/tmp/review") // store to temp location


def concatFiles(path:String):List[String] = {
  new File(path)
    .listFiles
    .filter(
      _.getName.startsWith("part") // get all part-xxx files only (should be only 1)
    )
    .flatMap(file => Source.fromFile(file.getAbsolutePath).getLines()) 
    .map("    " + _) // prefix with spaces to allow for new root level xml
    .toList
}

val lines = List("<xml>","<library>") ++ concatFiles("/tmp/books/") ++ concatFiles("/tmp/review/") ++ List("</library>")
new PrintWriter("/tmp/target.xml"){
  write(lines.mkString("
"))
  close
}

结果:

<xml>
<library>
    <books>
        <book>
            <author>book1</author>
        </book>
        <book>
            <author>book2</author>
        </book>
        <book>
            <author>book3</author>
        </book>
        <book>
            <author>book4</author>
        </book>
        <book>
            <author>book5</author>
        </book>
    </books>
    <reviews>
        <review>
            <id>1</id>
        </review>
        <review>
            <id>2</id>
        </review>
        <review>
            <id>3</id>
        </review>
        <review>
            <id>4</id>
        </review>
    </reviews>
</library>

另一种方法可能是(仅使用spark)创建一个新对象case class BookReview(books: List[Book], reviews: List[Review])并将其存储到xml之后.collect()将所有书籍和评论存储到单个列表中。

虽然我不会使用spark来处理单个记录(BookReview),但是使用普通的xml库(如xstream左右)来存储这个对象。


更新List concat方法不是内存友好的,所以使用流和缓冲区这可能是一个解决方案而不是concatFiles方法。

def outputConcatFiles(path: String, outputFile: File): Unit = {
  new File(path)
    .listFiles
    .filter(
      _.getName.startsWith("part") // get all part-xxx files only (should be only 1)
    )
    .foreach(file => {
      val writer = new BufferedOutputStream(new FileOutputStream(outputFile, true))
      val reader = new BufferedReader(new InputStreamReader(new FileInputStream(file)))

      try {
        Stream.continually(reader.readLine())
          .takeWhile(_ != null)
          .foreach(line =>
            writer.write(s"    $line
".getBytes)
          )
      } catch {
        case e: Exception => println(e.getMessage)
      } finally {
        writer.close()
        reader.close()
      }
    })
}

val outputFile = new File("/tmp/target2.xml")
new PrintWriter(outputFile) { write("<xml>
<library>
"); close}
outputConcatFiles("/tmp/books/", outputFile)
outputConcatFiles("/tmp/review/", outputFile)
new PrintWriter(new FileOutputStream(outputFile, true)) { append("</library>"); close}

以上是关于如何使用Spark-Xml生成复杂的XML的主要内容,如果未能解决你的问题,请参考以下文章

如何在 spark-xml 中禁用科学计数法

如何使用 spark-xml 包使用 XSD 解析 XML?

(spark-xml) 使用 from_xml 函数解析 xml 列时仅接收 null

spark-xml 中具有嵌套父节点的自定义模式

使用 spark-xml 从 pyspark 数据框中选择嵌套列

Spark-xml 在读取处理指令时崩溃