创建数据集时 Spark 无法反序列化记录

Posted

技术标签:

【中文标题】创建数据集时 Spark 无法反序列化记录【英文标题】:Spark failing to deserialize a record when creating Dataset 【发布时间】:2018-12-17 00:23:31 【问题描述】:

我正在从 S3 读取大量 CSV(所有内容都在一个键前缀下)并创建一个强类型 Dataset

val events: DataFrame = cdcFs.getStream()
events
  .withColumn("event", lit("I"))
  .withColumn("source", lit(sourceName))
  .as[TradeRecord]

其中TradeRecord 是一个案例类,通常可以通过 SparkSession 隐式反序列化。但是,对于某个批次,记录无法反序列化。这是错误(省略了堆栈跟踪)

Caused by: java.lang.NullPointerException: Null value appeared in non-nullable field:
- field (class: "scala.Long", name: "deal")
- root class: "com.company.trades.TradeRecord"
If the schema is inferred from a Scala tuple/case class, or a Java bean, please try to use scala.Option[_] or other nullable types (e.g. java.lang.Integer instead of int/scala.Int).

dealTradeRecord 的一个字段,在源数据(S3 对象)中永远不应为空,因此它不是 Option

不幸的是,错误消息没有给我任何关于 CSV 数据是什么样子的线索,甚至没有给我任何关于它来自哪个 CSV 文件的线索。该批次包含数百个文件,因此我需要一种方法将其缩小到最多几个文件来调查问题。

【问题讨论】:

Spark 2 Dataset Null value exception的可能重复 不同的问题。我理解错误的含义。我需要查看传入的数据是什么样的(是否缺少一列?csv 中是否有额外的逗号?),这意味着我需要知道哪个文件包含损坏的记录。 如果您想缩小范围,我建议您跳过as 并使用input_file_name 来查明确切的来源。 查找这个名为 columnNameOfCorruptRecord 的选项。它将使用此列标记记录损坏记录。您需要在案例类或模式中添加一个字段,并将其设置为选项的值。我还有另一个想法,使用名为 from_file 的 spark-sql 函数,稍后我将尝试解释。 【参考方案1】:

作为suggested user10465355,您可以加载数据:

val events: DataFrame = ???

过滤器

val mismatched = events.where($"deal".isNull)

添加文件名

import org.apache.spark.sql.functions.input_file_name

val tagged = mismatched.withColumn("_file_name", input_file_name)

可选地添加块和块和偏移量:

import org.apache.spark.sql.functions.spark_partition_id, monotonically_increasing_id, shiftLeft, shiftRight

df
  .withColumn("chunk", spark_partition_id())
  .withColumn(
    "offset",
    monotonically_increasing_id - shiftLeft(shiftRight(monotonically_increasing_id, 33), 33))

【讨论】:

【参考方案2】:

这是我想出的解决方案(我正在使用 Spark Structured Streaming):

val stream = spark.readStream
  .format("csv")
  .schema(schema) // a StructType defined elsewhere
  .option("mode", "PERMISSIVE")
  .option("columnNameOfCorruptRecord", "corruptRecord")
  .load(path)

// If debugging, check for any corrupted CSVs
if (log.isDebugEnabled)  // org.apache.spark.internal.Logging trait 
  import spark.implicits._
  stream
    .filter($"corruptRecord".isNotNull)
    .withColumn("input_file", input_file_name)
    .select($"input_file", $"corruptRecord")
    .writeStream
    .format("console")
    .option("truncate", false)
    .start()


val events = stream
  .withColumn("event", lit("I"))
  .withColumn("source", lit(sourceName))
  .as[TradeRecord]

基本上,如果 Spark 日志级别设置为 Debug 或更低,则会检查 DataFrame 中是否存在损坏的记录,并将任何此类记录与其文件名一起打印出来。最终,程序尝试将此 DataFrame 强制转换为强类型 Dataset[TradeRecord] 并失败。

【讨论】:

以上是关于创建数据集时 Spark 无法反序列化记录的主要内容,如果未能解决你的问题,请参考以下文章

反序列化 Avro Spark

0016-Avro序列化&反序列化和Spark读取Avro数据

如何在 Jackson 中记录 JSON 反序列化

Protobuf-Net 无法在没有无参数构造函数的情况下反序列化记录

RestSharp 显示无法创建接口实例的错误必须手动反序列化

来自示例 Java 程序的 Spark UDF 反序列化错误