使用 spark 读取 xml 时如何识别或重新路由错误的 xml

Posted

技术标签:

【中文标题】使用 spark 读取 xml 时如何识别或重新路由错误的 xml【英文标题】:How to identify or reroute bad xml's when reading xmls with spark 【发布时间】:2019-06-13 19:06:36 【问题描述】:

使用 spark,我正在尝试从路径中读取一堆 xml,其中一个文件是不是 xml 的虚拟文件。

我希望 spark 告诉我一个特定的文件是无效的,无论如何

添加 "badRecordsPath" otiton 将坏数据写入 JSON 文件的指定位置,但同样不适用于 xml,还有其他方法吗?

df = (spark.read.format('json')
      .option('badRecordsPath','/tmp/data/failed')
      .load('/tmp/data/dummy.json')

【问题讨论】:

【参考方案1】:

据我所知....不幸的是,直到今天,它还没有以 声明性方式...在您期望的方式中在 spark 的 xml 包中可用...

自从FailureSafeParser 实现如下以来,Json 一直在工作......在DataFrameReader 中

/**
   * Loads a `Dataset[String]` storing JSON objects (<a href="http://jsonlines.org/">JSON Lines
   * text format or newline-delimited JSON</a>) and returns the result as a `DataFrame`.
   *
   * Unless the schema is specified using `schema` function, this function goes through the
   * input once to determine the input schema.
   *
   * @param jsonDataset input Dataset with one JSON object per record
   * @since 2.2.0
   */
  def json(jsonDataset: Dataset[String]): DataFrame = 
    val parsedOptions = new JSONOptions(
      extraOptions.toMap,
      sparkSession.sessionState.conf.sessionLocalTimeZone,
      sparkSession.sessionState.conf.columnNameOfCorruptRecord)

    val schema = userSpecifiedSchema.getOrElse 
      TextInputJsonDataSource.inferFromDataset(jsonDataset, parsedOptions)
    

    ExprUtils.verifyColumnNameOfCorruptRecord(schema, parsedOptions.columnNameOfCorruptRecord)
    val actualSchema =
      StructType(schema.filterNot(_.name == parsedOptions.columnNameOfCorruptRecord))

    val createParser = CreateJacksonParser.string _
    val parsed = jsonDataset.rdd.mapPartitions  iter =>
      val rawParser = new JacksonParser(actualSchema, parsedOptions, allowArrayAsStructs = true)
      val parser = new FailureSafeParser[String](
        input => rawParser.parse(input, createParser, UTF8String.fromString),
        parsedOptions.parseMode,
        schema,
        parsedOptions.columnNameOfCorruptRecord)
      iter.flatMap(parser.parse)
    
    sparkSession.internalCreateDataFrame(parsed, schema, isStreaming = jsonDataset.isStreaming)
  

您可以以编程方式实现功能。 使用 sc.textFile 读取文件夹中的所有文件。 foreach 文件使用 xml 解析器解析条目。

如果它的有效重定向到另一个路径。

如果无效,则写入坏记录路径。

【讨论】:

我只是在等待其他人提供任何其他选择:)

以上是关于使用 spark 读取 xml 时如何识别或重新路由错误的 xml的主要内容,如果未能解决你的问题,请参考以下文章

如何将 Spark 消耗的最新偏移量保存到 ZK 或 Kafka 并在重启后可以读取

在 Spark 中读取 XML

如何在使用 Spark 读取时将数据分配到 X 分区?

使用 CLIXML 的 Spark 数据框

如何在 spark-xml 中禁用科学计数法

Spark 中的 XML 处理