如何使用 Spark-Xml 生成复杂的 XML
Posted
技术标签:
【中文标题】如何使用 Spark-Xml 生成复杂的 XML【英文标题】:How to Generate a complex XML using Spark-Xml 【发布时间】:2018-04-09 09:40:01 【问题描述】:我正在尝试从我的 JavaRDd
<xml>
<library>
<books>
<book>
<author>test</author>
</book>
</books>
<reviews>
<review>
<id>1</id>
</review>
</reviews>
</library>
如您所见,有一个父根库,其中包含子书和评论。
以下是我如何生成 Book and Review Dataframe
DataFrame bookFrame = sqlCon.createDataFrame(bookRDD, Book.class);
DataFrame reviewFrame = sqlCon.createDataFrame(reviewRDD, Review.class);
我知道要生成 xml,我特别怀疑是否拥有 Library rootTag 并将 Books and Reviews 作为其子项。
我正在使用 Java。但如果你能指出我的正确,你可以编写 Scala 或 Python 示例。
【问题讨论】:
那行不通。根标签很简单(有一个选项),但你不能在一个DataFrame
中拥有不同类型的对象(不同的模式)。
@user9613318 那么我应该在生成2个不同的xml之后执行字符串连接吗?这是唯一的出路吗?
@user9613318 你能帮我解决这个问题吗? ***.com/questions/50007809/…
@user9613318 你能帮我解决这个问题吗***.com/questions/50131641/…
【参考方案1】:
这可能不是使用 Spark 执行此操作的最有效方式,但下面的代码可以按您的意愿工作。 (虽然是 Scala,因为我的 Java 有点生疏)
import java.io.File, PrintWriter
import org.apache.spark.sql.SaveMode, SparkSession
import scala.io.Source
val spark = SparkSession.builder()
.master("local[3]")
.appName("test")
.config("spark.driver.allowMultipleContexts", "true")
.getOrCreate()
import spark.implicits._
/* Some code to test */
case class Book(author: String)
case class Review(id: Int)
org.apache.spark.sql.catalyst.encoders.OuterScopes.addOuterScope(this)
val bookFrame = List(
Book("book1"),
Book("book2"),
Book("book3"),
Book("book4"),
Book("book5")
).toDS()
val reviewFrame = List(
Review(1),
Review(2),
Review(3),
Review(4)
).toDS()
/* End test code **/
// Using databricks api save as 1 big xml file (instead of many parts, using repartition)
// You don't have to use repartition, but each part-xxx file will wrap contents in the root tag, making it harder to concat later.
// And TBH it really doesn't matter that Spark is doing the merging here, since the combining of data is already on the master node only
bookFrame
.repartition(1)
.write
.format("com.databricks.spark.xml")
.option("rootTag", "books")
.option("rowTag", "book")
.mode(SaveMode.Overwrite)
.save("/tmp/books/") // store to temp location
// Same for reviews
reviewFrame
.repartition(1)
.write
.format("com.databricks.spark.xml")
.option("rootTag", "reviews")
.option("rowTag", "review")
.mode(SaveMode.Overwrite)
.save("/tmp/review") // store to temp location
def concatFiles(path:String):List[String] =
new File(path)
.listFiles
.filter(
_.getName.startsWith("part") // get all part-xxx files only (should be only 1)
)
.flatMap(file => Source.fromFile(file.getAbsolutePath).getLines())
.map(" " + _) // prefix with spaces to allow for new root level xml
.toList
val lines = List("<xml>","<library>") ++ concatFiles("/tmp/books/") ++ concatFiles("/tmp/review/") ++ List("</library>")
new PrintWriter("/tmp/target.xml")
write(lines.mkString("\n"))
close
结果:
<xml>
<library>
<books>
<book>
<author>book1</author>
</book>
<book>
<author>book2</author>
</book>
<book>
<author>book3</author>
</book>
<book>
<author>book4</author>
</book>
<book>
<author>book5</author>
</book>
</books>
<reviews>
<review>
<id>1</id>
</review>
<review>
<id>2</id>
</review>
<review>
<id>3</id>
</review>
<review>
<id>4</id>
</review>
</reviews>
</library>
另一种方法可能是(仅使用 spark)创建一个新对象
case class BookReview(books: List[Book], reviews: List[Review])
并在 .collect()
之后将所有书籍和评论存储到 xml 到一个列表中。
虽然那时我不会使用 spark 只处理单个记录 (BookReview),而是使用普通的 xml 库(如 xstream 左右)来存储此对象。
更新
List concat 方法对内存不友好,因此使用流和缓冲区这可能是一个解决方案,而不是concatFiles
方法。
def outputConcatFiles(path: String, outputFile: File): Unit =
new File(path)
.listFiles
.filter(
_.getName.startsWith("part") // get all part-xxx files only (should be only 1)
)
.foreach(file =>
val writer = new BufferedOutputStream(new FileOutputStream(outputFile, true))
val reader = new BufferedReader(new InputStreamReader(new FileInputStream(file)))
try
Stream.continually(reader.readLine())
.takeWhile(_ != null)
.foreach(line =>
writer.write(s" $line\n".getBytes)
)
catch
case e: Exception => println(e.getMessage)
finally
writer.close()
reader.close()
)
val outputFile = new File("/tmp/target2.xml")
new PrintWriter(outputFile) write("<xml>\n<library>\n"); close
outputConcatFiles("/tmp/books/", outputFile)
outputConcatFiles("/tmp/review/", outputFile)
new PrintWriter(new FileOutputStream(outputFile, true)) append("</library>"); close
【讨论】:
感谢您的回答。我收集了大量的记录,接近五十万,因此我无法列出它。在当前配置中,它会抛出 OutOfMemory... 但我会尝试您所解释的串联。 好点。我已经更新了我的答案以允许 FileStreams 等 你能帮我解决这个问题吗? ***.com/questions/50007809/… Tom Lous ***.com/questions/50131641/… 你能帮我解决这个问题吗以上是关于如何使用 Spark-Xml 生成复杂的 XML的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 spark-xml 包使用 XSD 解析 XML?
(spark-xml) 使用 from_xml 函数解析 xml 列时仅接收 null