使用 spark 读取 xml 时如何识别或重新路由错误的 xml
Posted
技术标签:
【中文标题】使用 spark 读取 xml 时如何识别或重新路由错误的 xml【英文标题】:How to identify or reroute bad xml's when reading xmls with spark 【发布时间】:2019-06-13 19:06:36 【问题描述】:使用 spark,我正在尝试从路径中读取一堆 xml,其中一个文件是不是 xml 的虚拟文件。
我希望 spark 告诉我一个特定的文件是无效的,无论如何
添加 "badRecordsPath" otiton 将坏数据写入 JSON 文件的指定位置,但同样不适用于 xml,还有其他方法吗?
df = (spark.read.format('json')
.option('badRecordsPath','/tmp/data/failed')
.load('/tmp/data/dummy.json')
【问题讨论】:
【参考方案1】:据我所知....不幸的是,直到今天,它还没有以 声明性方式...在您期望的方式中在 spark 的 xml 包中可用...
自从FailureSafeParser
实现如下以来,Json 一直在工作......在DataFrameReader 中
/**
* Loads a `Dataset[String]` storing JSON objects (<a href="http://jsonlines.org/">JSON Lines
* text format or newline-delimited JSON</a>) and returns the result as a `DataFrame`.
*
* Unless the schema is specified using `schema` function, this function goes through the
* input once to determine the input schema.
*
* @param jsonDataset input Dataset with one JSON object per record
* @since 2.2.0
*/
def json(jsonDataset: Dataset[String]): DataFrame =
val parsedOptions = new JSONOptions(
extraOptions.toMap,
sparkSession.sessionState.conf.sessionLocalTimeZone,
sparkSession.sessionState.conf.columnNameOfCorruptRecord)
val schema = userSpecifiedSchema.getOrElse
TextInputJsonDataSource.inferFromDataset(jsonDataset, parsedOptions)
ExprUtils.verifyColumnNameOfCorruptRecord(schema, parsedOptions.columnNameOfCorruptRecord)
val actualSchema =
StructType(schema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord))
val createParser = CreateJacksonParser.string _
val parsed = jsonDataset.rdd.mapPartitions iter =>
val rawParser = new JacksonParser(actualSchema, parsedOptions, allowArrayAsStructs = true)
val parser = new FailureSafeParser[String](
input => rawParser.parse(input, createParser, UTF8String.fromString),
parsedOptions.parseMode,
schema,
parsedOptions.columnNameOfCorruptRecord)
iter.flatMap(parser.parse)
sparkSession.internalCreateDataFrame(parsed, schema, isStreaming = jsonDataset.isStreaming)
您可以以编程方式实现功能。
使用 sc.textFile
读取文件夹中的所有文件。
foreach 文件使用 xml 解析器解析条目。
如果它的有效重定向到另一个路径。
如果无效,则写入坏记录路径。
【讨论】:
我只是在等待其他人提供任何其他选择:)以上是关于使用 spark 读取 xml 时如何识别或重新路由错误的 xml的主要内容,如果未能解决你的问题,请参考以下文章