Spark-xml 在读取处理指令时崩溃

Posted

技术标签:

【中文标题】Spark-xml 在读取处理指令时崩溃【英文标题】:Spark-xml crashes on reading processing instructions 【发布时间】:2019-08-20 20:55:05 【问题描述】:

我正在尝试使用 Databricks spark-xml 包将 XML 文件读入 Spark 数据帧。但是,当遇到处理指令时,Spark 会引发一个错误,声称发生了意外事件。

我正在尝试将 XML 文件导入到数据帧中,然后我可以将其操作到平面文件中以写入 CSV。数据集足够大,我们需要某种处理程序,例如 Spark。我查看了 spark-xml 文档,找不到任何关于处理指令的提及。我实际上不需要说明中的任何信息,所以如果可以选择的话,我很乐意忽略它们,但因为它们正在干扰整个文件。任何建议将不胜感激。

这是一个重现问题的 XML sn-p:

<?xml version="1.0" encoding="UTF-8"?>
<row>
<description>
<?issue?>
<text>foo</text>
</description>
</row>

这是我尝试在 Python 中读取 XML 的方式:

sc = SparkContext()
sql = SQLContext(sc)
xml = sql.read.format("com.databricks.spark.xml").option("rowTag", "row").load("example.xml")

为了完整起见,以下是我加载数据块并将脚本提交给 Spark 的方式:

spark-submit --packages com.databricks:spark-csv_2.11:1.5.0,com.databricks:spark-xml_2.10:0.4.1 example.py

当我尝试使用上面的代码读取 XML 时,Spark 会引发一个异常,声称是“意外事件”。在下面找到确切的错误消息。

2019-08-20 13:47:03 ERROR Executor:91 - Exception in task 0.0 in stage 0.0 (TID 0)
java.lang.RuntimeException: Failed to parse data with unexpected event <?issue ?>
    at scala.sys.package$.error(package.scala:27)
    at com.databricks.spark.xml.util.InferSchema$.inferField(InferSchema.scala:151)
    at com.databricks.spark.xml.util.InferSchema$.com$databricks$spark$xml$util$InferSchema$$inferObject(InferSchema.scala:178)
    at com.databricks.spark.xml.util.InferSchema$$anonfun$3$$anonfun$apply$2.apply(InferSchema.scala:101)
    at com.databricks.spark.xml.util.InferSchema$$anonfun$3$$anonfun$apply$2.apply(InferSchema.scala:89)
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
    at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
    at scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:214)
    at scala.collection.AbstractIterator.aggregate(Iterator.scala:1336)
    at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$23.apply(RDD.scala:1139)
    at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$23.apply(RDD.scala:1139)
    at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$24.apply(RDD.scala:1140)
    at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$24.apply(RDD.scala:1140)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)
2019-08-20 13:47:03 WARN  TaskSetManager:66 - Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.RuntimeException: Failed to parse data with unexpected event <?issue ?>
    at scala.sys.package$.error(package.scala:27)
    at com.databricks.spark.xml.util.InferSchema$.inferField(InferSchema.scala:151)
    at com.databricks.spark.xml.util.InferSchema$.com$databricks$spark$xml$util$InferSchema$$inferObject(InferSchema.scala:178)
    at com.databricks.spark.xml.util.InferSchema$$anonfun$3$$anonfun$apply$2.apply(InferSchema.scala:101)
    at com.databricks.spark.xml.util.InferSchema$$anonfun$3$$anonfun$apply$2.apply(InferSchema.scala:89)
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
    at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
    at scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:214)
    at scala.collection.AbstractIterator.aggregate(Iterator.scala:1336)
    at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$23.apply(RDD.scala:1139)
    at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$23.apply(RDD.scala:1139)
    at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$24.apply(RDD.scala:1140)
    at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$24.apply(RDD.scala:1140)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)

2019-08-20 13:47:03 ERROR TaskSetManager:70 - Task 0 in stage 0.0 failed 1 times; aborting job
Traceback (most recent call last):
  File "/oak/stanford/groups/hlwill/gsmoore/projects/parser_new/***/example.py", line 10, in <module>
    xml = sql.read.format("com.databricks.spark.xml").option("rowTag", "row").load("example.xml")
  File "/share/software/user/open/spark/2.3.0/python/pyspark/sql/readwriter.py", line 166, in load
    return self._df(self._jreader.load(path))
  File "/share/software/user/open/spark/2.3.0/python/lib/py4j-0.10.6-src.zip/py4j/java_gateway.py", line 1160, in __call__
  File "/share/software/user/open/spark/2.3.0/python/pyspark/sql/utils.py", line 63, in deco
    return f(*a, **kw)
  File "/share/software/user/open/spark/2.3.0/python/lib/py4j-0.10.6-src.zip/py4j/protocol.py", line 320, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling o27.load.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 0.0 failed 1 times, most recent failure: Lost task 0.0 in stage 0.0 (TID 0, localhost, executor driver): java.lang.RuntimeException: Failed to parse data with unexpected event <?issue ?>
    at scala.sys.package$.error(package.scala:27)
    at com.databricks.spark.xml.util.InferSchema$.inferField(InferSchema.scala:151)
    at com.databricks.spark.xml.util.InferSchema$.com$databricks$spark$xml$util$InferSchema$$inferObject(InferSchema.scala:178)
    at com.databricks.spark.xml.util.InferSchema$$anonfun$3$$anonfun$apply$2.apply(InferSchema.scala:101)
    at com.databricks.spark.xml.util.InferSchema$$anonfun$3$$anonfun$apply$2.apply(InferSchema.scala:89)
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
    at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
    at scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:214)
    at scala.collection.AbstractIterator.aggregate(Iterator.scala:1336)
    at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$23.apply(RDD.scala:1139)
    at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$23.apply(RDD.scala:1139)
    at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$24.apply(RDD.scala:1140)
    at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$24.apply(RDD.scala:1140)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1599)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1587)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1586)
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1586)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:831)
    at scala.Option.foreach(Option.scala:257)
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:831)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:1820)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1769)
    at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:1758)
    at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:642)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2027)
    at org.apache.spark.SparkContext.runJob(SparkContext.scala:2124)
    at org.apache.spark.rdd.RDD$$anonfun$fold$1.apply(RDD.scala:1092)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
    at org.apache.spark.rdd.RDD.fold(RDD.scala:1086)
    at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1.apply(RDD.scala:1155)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
    at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1131)
    at com.databricks.spark.xml.util.InferSchema$.infer(InferSchema.scala:109)
    at com.databricks.spark.xml.XmlRelation$$anonfun$1.apply(XmlRelation.scala:46)
    at com.databricks.spark.xml.XmlRelation$$anonfun$1.apply(XmlRelation.scala:46)
    at scala.Option.getOrElse(Option.scala:121)
    at com.databricks.spark.xml.XmlRelation.<init>(XmlRelation.scala:45)
    at com.databricks.spark.xml.DefaultSource.createRelation(DefaultSource.scala:65)
    at com.databricks.spark.xml.DefaultSource.createRelation(DefaultSource.scala:43)
    at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:340)
    at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:239)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:174)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:214)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.RuntimeException: Failed to parse data with unexpected event <?issue ?>
    at scala.sys.package$.error(package.scala:27)
    at com.databricks.spark.xml.util.InferSchema$.inferField(InferSchema.scala:151)
    at com.databricks.spark.xml.util.InferSchema$.com$databricks$spark$xml$util$InferSchema$$inferObject(InferSchema.scala:178)
    at com.databricks.spark.xml.util.InferSchema$$anonfun$3$$anonfun$apply$2.apply(InferSchema.scala:101)
    at com.databricks.spark.xml.util.InferSchema$$anonfun$3$$anonfun$apply$2.apply(InferSchema.scala:89)
    at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
    at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
    at scala.collection.Iterator$class.foreach(Iterator.scala:893)
    at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
    at scala.collection.TraversableOnce$class.foldLeft(TraversableOnce.scala:157)
    at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1336)
    at scala.collection.TraversableOnce$class.aggregate(TraversableOnce.scala:214)
    at scala.collection.AbstractIterator.aggregate(Iterator.scala:1336)
    at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$23.apply(RDD.scala:1139)
    at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$23.apply(RDD.scala:1139)
    at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$24.apply(RDD.scala:1140)
    at org.apache.spark.rdd.RDD$$anonfun$treeAggregate$1$$anonfun$24.apply(RDD.scala:1140)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitions$1$$anonfun$apply$23.apply(RDD.scala:800)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:38)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
    at org.apache.spark.scheduler.Task.run(Task.scala:109)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:345)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    ... 1 more

【问题讨论】:

&lt;?issue?&gt; 这不是一个有效的 xml 我发现了多个来源(here 和处理指令Wikipedia page),这表明包含在&lt;? ?&gt; 中的任何节点都被视为处理指令,有效的 XML 也是如此。根据***:“一条 XML 处理指令包含在 &lt;??&gt; 中,并包含一个目标和可选的一些内容,即节点值,不能包含序列 ?&gt;&lt;?PITarget PIContent?&gt;”鉴于string &lt;?issue?&gt; 满足所有这些条件,它似乎是有效的 XML。 学到了新东西!谢谢你,好像 xml 阅读器无法处理 XML 处理指令 【参考方案1】:

最终弄明白了——原来我一直在使用一个过时的 spark-xml 版本。至少目前,加载数据块包的正确方法如下:

spark-submit --packages com.databricks:spark-csv_2.11:1.5.0,com.databricks:spark-xml_2.11:0.6.0 example.py

这样两件事是正确的:

    所有包都在相同版本的 Scala 2.11 中运行(应该与用于运行 Spark 的版本相匹配)。您可以通过键入 spark-shell --version 查看您正在运行的 Spark 版本。 根据他们的github,我正在使用每个包的最新版本。

【讨论】:

以上是关于Spark-xml 在读取处理指令时崩溃的主要内容,如果未能解决你的问题,请参考以下文章

崩溃现场与处理

为啥在启动程序时单步执行每条指令都会崩溃? [调试]

防止应用崩溃

崩溃之上

在 seekg() 上读取文件时崩溃

xcode 在更新 UI 文本字段时因读取而崩溃